You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/08 12:03:11 UTC

[pulsar] branch master updated: [pulsar-clients] Support nested struct for GenericRecord (#4177)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 78502a3  [pulsar-clients] Support nested struct for GenericRecord (#4177)
78502a3 is described below

commit 78502a3cfae0d789cea667c4829830487517b7ea
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed May 8 20:03:04 2019 +0800

    [pulsar-clients] Support nested struct for GenericRecord (#4177)
    
    ### Motivation
    
    Currently, GenericRecordBuilder only supports primitive types, e.g. int, long, string. But it doesn’t support struct type.
    
    ### Modifications
    
    Support nested struct for GenericRecordBuilder, for example AVRO
    
    ### Verifying this change
    
    Unit Test Pass
---
 .../client/api/schema/RecordSchemaBuilder.java     |   9 +
 .../client/impl/schema/FieldSchemaBuilderImpl.java |  17 ++
 .../impl/schema/RecordSchemaBuilderImpl.java       |   8 +
 .../impl/schema/generic/AvroRecordBuilderImpl.java |  29 +-
 .../client/impl/schema/SchemaBuilderTest.java      | 292 ++++++++++++++++++++-
 5 files changed, 346 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
index a3e34d7..7526ecf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
@@ -44,6 +44,15 @@ public interface RecordSchemaBuilder {
     FieldSchemaBuilder field(String fieldName);
 
     /**
+     * Add a field with the given name and genericSchema to the record.
+     *
+     * @param fieldName name of the field
+     * @param genericSchema schema of the field
+     * @return field schema builder to build the field.
+     */
+    FieldSchemaBuilder field(String fieldName, GenericSchema genericSchema);
+
+    /**
      * Add doc to the record schema.
      *
      * @param doc documentation
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
index ab6f134..88283bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import java.util.HashMap;
@@ -28,6 +29,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaBuilder;
 import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
@@ -44,8 +47,15 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
     private String doc;
     private String[] aliases;
 
+    private GenericSchema genericSchema;
+
     FieldSchemaBuilderImpl(String fieldName) {
+        this(fieldName, null);
+    }
+
+    FieldSchemaBuilderImpl(String fieldName, GenericSchema genericSchema) {
         this.fieldName = fieldName;
+        this.genericSchema = genericSchema;
     }
 
     @Override
@@ -132,6 +142,13 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
             case TIMESTAMP:
                 baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
                 break;
+            case AVRO:
+                checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.AVRO,
+                        "The field is expected to be using AVRO schema but "
+                                + genericSchema.getSchemaInfo().getType() + " schema is found");
+                GenericAvroSchema genericAvroSchema = (GenericAvroSchema) genericSchema;
+                baseSchema = genericAvroSchema.getAvroSchema();
+                break;
             default:
                 throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
index 9abede8..ee9f0cb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -61,6 +62,13 @@ public class RecordSchemaBuilderImpl implements RecordSchemaBuilder {
     }
 
     @Override
+    public FieldSchemaBuilder field(String fieldName, GenericSchema genericSchema) {
+        FieldSchemaBuilderImpl field = new FieldSchemaBuilderImpl(fieldName, genericSchema);
+        fields.add(field);
+        return field;
+    }
+
+    @Override
     public RecordSchemaBuilder doc(String doc) {
         this.doc = doc;
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java
index a8b2dd7..7278ab5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 
+import java.util.List;
+
 /**
  * Builder to build {@link org.apache.pulsar.client.api.schema.GenericRecord}.
  */
@@ -45,7 +47,15 @@ class AvroRecordBuilderImpl implements GenericRecordBuilder {
      */
     @Override
     public GenericRecordBuilder set(String fieldName, Object value) {
-        avroRecordBuilder.set(fieldName, value);
+        if (value instanceof GenericRecord) {
+            if (value instanceof GenericAvroRecord) {
+                avroRecordBuilder.set(fieldName, ((GenericAvroRecord)value).getAvroRecord());
+            } else {
+                throw new IllegalArgumentException("Avro Record Builder doesn't support non-avro record as a field");
+            }
+        } else {
+            avroRecordBuilder.set(fieldName, value);
+        }
         return this;
     }
 
@@ -70,10 +80,19 @@ class AvroRecordBuilderImpl implements GenericRecordBuilder {
      * @return a reference to the RecordBuilder.
      */
     protected GenericRecordBuilder set(int index, Object value) {
-        avroRecordBuilder.set(
-            genericSchema.getAvroSchema().getFields().get(index),
-            value
-        );
+        if (value instanceof GenericRecord) {
+            if (value instanceof GenericAvroRecord) {
+                avroRecordBuilder.set(genericSchema.getAvroSchema().getFields().get(index),
+                        ((GenericAvroRecord) value).getAvroRecord());
+            } else {
+                throw new IllegalArgumentException("Avro Record Builder doesn't support non-avro record as a field");
+            }
+        } else {
+            avroRecordBuilder.set(
+                    genericSchema.getAvroSchema().getFields().get(index),
+                    value
+            );
+        }
         return this;
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index 207fd98..9ce3d5c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -21,12 +21,12 @@ package org.apache.pulsar.client.impl.schema;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
-import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.*;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.Test;
@@ -59,6 +59,29 @@ public class SchemaBuilderTest {
         private double doubleField;
     }
 
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    private static class People {
+        private People1 people1;
+        private People2 people2;
+        private String name;
+    }
+
+    @Data
+    private static class People1 {
+        private int age;
+        private int height;
+        private String name;
+    }
+
+    @Data
+    private static class People2 {
+        private int age;
+        private int height;
+        private String name;
+    }
+
     @Test
     public void testAllOptionalFieldsSchema() {
         RecordSchemaBuilder recordSchemaBuilder =
@@ -200,4 +223,265 @@ public class SchemaBuilderTest {
         assertEquals(0.7f, fields.floatField);
         assertEquals(1.34d, fields.doubleField);
     }
+
+    @Test
+    public void testGenericRecordBuilderAvroByFieldname() {
+        RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+        people1SchemaBuilder.field("age").type(SchemaType.INT32);
+        people1SchemaBuilder.field("height").type(SchemaType.INT32);
+        people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema people1Schema = Schema.generic(people1SchemaInfo);
+
+
+        GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+        people1RecordBuilder.set("age", 20);
+        people1RecordBuilder.set("height", 180);
+        people1RecordBuilder.set("name", "people1");
+        GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+        RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+        people2SchemaBuilder.field("age").type(SchemaType.INT32);
+        people2SchemaBuilder.field("height").type(SchemaType.INT32);
+        people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+        SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema people2Schema = Schema.generic(people2SchemaInfo);
+
+        GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+        people2RecordBuilder.set("age", 20);
+        people2RecordBuilder.set("height", 180);
+        people2RecordBuilder.set("name", "people2");
+        GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+        RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+        peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+        GenericSchema peopleSchema = Schema.generic(schemaInfo);
+        GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+        peopleRecordBuilder.set("people1", people1GenericRecord);
+        peopleRecordBuilder.set("people2", people2GenericRecord);
+        peopleRecordBuilder.set("name", "people");
+        GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+        byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+        GenericRecord people = (GenericRecord) peopleSchema.decode(peopleEncode);
+
+        assertEquals(people.getFields(), peopleRecord.getFields());
+        assertEquals((people.getField("name")), peopleRecord.getField("name"));
+        assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
+                people1GenericRecord.getField("age"));
+        assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
+                people1GenericRecord.getField("heigth"));
+        assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
+                people1GenericRecord.getField("name"));
+        assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
+                people2GenericRecord.getField("age"));
+        assertEquals(((GenericRecord)people.getField("people2")).getField("height"),
+                people2GenericRecord.getField("height"));
+        assertEquals(((GenericRecord)people.getField("people2")).getField("name"),
+                people2GenericRecord.getField("name"));
+
+    }
+
+    @Test
+    public void testGenericRecordBuilderAvroByFieldnamePojo() {
+        RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+        people1SchemaBuilder.field("age").type(SchemaType.INT32);
+        people1SchemaBuilder.field("height").type(SchemaType.INT32);
+        people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema people1Schema = Schema.generic(people1SchemaInfo);
+
+
+        GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+        people1RecordBuilder.set("age", 20);
+        people1RecordBuilder.set("height", 180);
+        people1RecordBuilder.set("name", "people1");
+        GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+        RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+        people2SchemaBuilder.field("age").type(SchemaType.INT32);
+        people2SchemaBuilder.field("height").type(SchemaType.INT32);
+        people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+        SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema people2Schema = Schema.generic(people2SchemaInfo);
+
+        GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+        people2RecordBuilder.set("age", 20);
+        people2RecordBuilder.set("height", 180);
+        people2RecordBuilder.set("name", "people2");
+        GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+        RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+        peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+        GenericSchema peopleSchema = Schema.generic(schemaInfo);
+        GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+        peopleRecordBuilder.set("people1", people1GenericRecord);
+        peopleRecordBuilder.set("people2", people2GenericRecord);
+        peopleRecordBuilder.set("name", "people");
+        GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+        byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+        Schema<People> peopleDecodeSchema = Schema.AVRO(
+                SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+        People people = peopleDecodeSchema.decode(peopleEncode);
+
+        assertEquals(people.name, peopleRecord.getField("name"));
+        assertEquals(people.getPeople1().age, people1GenericRecord.getField("age"));
+        assertEquals(people.getPeople1().height, people1GenericRecord.getField("height"));
+        assertEquals(people.getPeople1().name, people1GenericRecord.getField("name"));
+        assertEquals(people.getPeople2().age, people2GenericRecord.getField("age"));
+        assertEquals(people.getPeople2().height, people2GenericRecord.getField("height"));
+        assertEquals(people.getPeople2().name, people2GenericRecord.getField("name"));
+
+    }
+
+    @Test
+    public void testGenericRecordBuilderAvroByFieldIndex() {
+        RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+        people1SchemaBuilder.field("age").type(SchemaType.INT32);
+        people1SchemaBuilder.field("height").type(SchemaType.INT32);
+        people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema<GenericRecord> people1Schema = Schema.generic(people1SchemaInfo);
+
+
+        GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+        people1RecordBuilder.set(people1Schema.getFields().get(0), 20);
+        people1RecordBuilder.set(people1Schema.getFields().get(1), 180);
+        people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
+        GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+        RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+        people2SchemaBuilder.field("age").type(SchemaType.INT32);
+        people2SchemaBuilder.field("height").type(SchemaType.INT32);
+        people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+        SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema<GenericRecord> people2Schema = Schema.generic(people2SchemaInfo);
+
+        GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+        people2RecordBuilder.set(people2Schema.getFields().get(0), 20);
+        people2RecordBuilder.set(people2Schema.getFields().get(1), 180);
+        people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
+        GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+        RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+        peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+        GenericSchema<GenericRecord> peopleSchema = Schema.generic(schemaInfo);
+        GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+        peopleRecordBuilder.set(peopleSchema.getFields().get(0), people1GenericRecord);
+        peopleRecordBuilder.set(peopleSchema.getFields().get(1), people2GenericRecord);
+        peopleRecordBuilder.set(peopleSchema.getFields().get(2), "people");
+        GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+        byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+        GenericRecord people = (GenericRecord) peopleSchema.decode(peopleEncode);
+
+        assertEquals(people.getFields(), peopleRecord.getFields());
+        assertEquals((people.getField("name")), peopleRecord.getField("name"));
+        assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
+                people1GenericRecord.getField("age"));
+        assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
+                people1GenericRecord.getField("heigth"));
+        assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
+                people1GenericRecord.getField("name"));
+        assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
+                people2GenericRecord.getField("age"));
+        assertEquals(((GenericRecord)people.getField("people2")).getField("height"),
+                people2GenericRecord.getField("height"));
+        assertEquals(((GenericRecord)people.getField("people2")).getField("name"),
+                people2GenericRecord.getField("name"));
+
+    }
+
+    @Test
+    public void testGenericRecordBuilderAvroByFieldIndexPojo() {
+        RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+        people1SchemaBuilder.field("age").type(SchemaType.INT32);
+        people1SchemaBuilder.field("height").type(SchemaType.INT32);
+        people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema<GenericRecord> people1Schema = Schema.generic(people1SchemaInfo);
+
+
+        GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+        people1RecordBuilder.set(people1Schema.getFields().get(0), 20);
+        people1RecordBuilder.set(people1Schema.getFields().get(1), 180);
+        people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
+        GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+        RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+        people2SchemaBuilder.field("age").type(SchemaType.INT32);
+        people2SchemaBuilder.field("height").type(SchemaType.INT32);
+        people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+        SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+        GenericSchema<GenericRecord> people2Schema = Schema.generic(people2SchemaInfo);
+
+        GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+        people2RecordBuilder.set(people2Schema.getFields().get(0), 20);
+        people2RecordBuilder.set(people2Schema.getFields().get(1), 180);
+        people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
+        GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+        RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+        peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+        peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+        SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+        GenericSchema<GenericRecord> peopleSchema = Schema.generic(schemaInfo);
+        GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+        peopleRecordBuilder.set(peopleSchema.getFields().get(0), people1GenericRecord);
+        peopleRecordBuilder.set(peopleSchema.getFields().get(1), people2GenericRecord);
+        peopleRecordBuilder.set(peopleSchema.getFields().get(2), "people");
+        GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+        byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+        Schema<People> peopleDecodeSchema = Schema.AVRO(
+                SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+        People people = peopleDecodeSchema.decode(peopleEncode);
+
+        assertEquals(people.name, peopleRecord.getField("name"));
+        assertEquals(people.getPeople1().age, people1GenericRecord.getField("age"));
+        assertEquals(people.getPeople1().height, people1GenericRecord.getField("height"));
+        assertEquals(people.getPeople1().name, people1GenericRecord.getField("name"));
+        assertEquals(people.getPeople2().age, people2GenericRecord.getField("age"));
+        assertEquals(people.getPeople2().height, people2GenericRecord.getField("height"));
+        assertEquals(people.getPeople2().name, people2GenericRecord.getField("name"));
+    }
 }