You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/22 16:23:43 UTC

[pulsar] branch master updated: [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf06ef3  [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325)
bf06ef3 is described below

commit bf06ef3ebeccfe758a3db1724bac62dd99a060de
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 23 00:23:37 2019 +0800

    [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325)
    
    * [schema] AutoConsume should use the schema associated with messages for both writer and reader schema
    
    *Motivation*
    
    AutoConsume should use the schema associated with the messages for decoding the schemas.
    
    *Modifications*
    
    - provide a flag enable or disable using the provided schema as the reader schema
    - for AUTO_CONSUME schema, disable usnig the provided schema as the reader schema. so it can use the right
      schema version for decoding messages into right generic records
    - provide a few util methods for displaying schema data
    
    * Handle 64 bytes schema version
    
    * Addressed review comments
---
 .../apache/pulsar/common/schema/SchemaInfo.java    |  18 +++
 .../pulsar/client/impl/PulsarClientImpl.java       |  11 +-
 .../pulsar/client/impl/schema/AvroSchema.java      |   9 +-
 .../pulsar/client/impl/schema/JSONSchema.java      |   1 -
 .../pulsar/client/impl/schema/ProtobufSchema.java  |   4 +-
 .../pulsar/client/impl/schema/SchemaUtils.java     |  22 +++-
 .../pulsar/client/impl/schema/StructSchema.java    |   6 +
 .../impl/schema/generic/GenericAvroSchema.java     |  23 +++-
 .../impl/schema/generic/GenericJsonSchema.java     |  29 ++++-
 .../impl/schema/generic/GenericSchemaImpl.java     |  17 ++-
 .../integration/cli/SchemaUpdateStrategyTest.java  | 140 ++++++++++++++++++---
 11 files changed, 245 insertions(+), 35 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 87d0e15..c5d1b72 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.common.schema;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Base64;
 import java.util.Collections;
 import java.util.Map;
 
@@ -52,4 +55,19 @@ public class SchemaInfo {
      * Additional properties of the schema definition (implementation defined)
      */
     private Map<String, String> properties = Collections.emptyMap();
+
+    public String getSchemaDefinition() {
+        if (null == schema) {
+            return "";
+        }
+
+        switch (type) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF:
+                return new String(schema, UTF_8);
+            default:
+                return Base64.getEncoder().encodeToString(schema);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 73a4a3c..57d1956 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -730,13 +730,16 @@ public class PulsarClientImpl implements PulsarClient {
 
             if (schema instanceof AutoConsumeSchema) {
                 SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
-                if (schemaInfo.getType() != SchemaType.AVRO){
+                if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
                     throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
-
                 }
-                GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema());
+
+                // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
+                // to decode the messages.
+                GenericSchema genericSchema = GenericSchemaImpl.of(
+                    schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
                 log.info("Auto detected schema for topic {} : {}",
-                        topicName, new String(schemaInfo.getSchema(), UTF_8));
+                        topicName, schemaInfo.getSchemaDefinition());
                 ((AutoConsumeSchema) schema).setSchema(genericSchema);
             }
             schema.setSchemaInfoProvider(schemaInfoProvider);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 14021f2..2944e14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -102,11 +102,16 @@ public class AvroSchema<T> extends StructSchema<T> {
     protected SchemaReader<T> loadReader(byte[] schemaVersion) {
         SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
         if (schemaInfo != null) {
-            return new AvroReader<>(parseAvroSchema(new String(schemaInfo.getSchema())), schema);
+            log.info("Load schema reader for version({}), schema is : {}",
+                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                schemaInfo.getSchemaDefinition());
+            return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
         } else {
+            log.warn("No schema found for version({}), use latest schema : {}",
+                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                this.schemaInfo.getSchemaDefinition());
             return reader;
         }
     }
 
-
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4646465..04b249c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.api.schema.SchemaWriter;
 import org.apache.pulsar.client.impl.schema.reader.JsonReader;
 import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 9328908..b40f2ef 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -27,7 +27,6 @@ import lombok.Getter;
 import org.apache.avro.protobuf.ProtobufData;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.api.schema.SchemaWriter;
 import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
 import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -96,7 +95,8 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
 
     @Override
     protected SchemaReader<T> loadReader(byte[] schemaVersion) {
-        throw new RuntimeException("ProtobufSchema don't support schema versioning");    }
+        throw new RuntimeException("ProtobufSchema don't support schema versioning");
+    }
 
     public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
         return of(pojo, new HashMap<>());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index dce7192..7ffdd18 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -22,15 +22,17 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
 
 /**
  * Utils for schemas.
  */
-final class SchemaUtils {
+public final class SchemaUtils {
 
     private SchemaUtils() {}
 
@@ -149,4 +151,22 @@ final class SchemaUtils {
         }
     }
 
+    public static String getStringSchemaVersion(byte[] schemaVersionBytes) {
+        if (null == schemaVersionBytes) {
+            return "NULL";
+        } else if (
+            // the length of schema version is 8 bytes post 2.4.0
+            schemaVersionBytes.length == Long.BYTES
+            // the length of schema version is 64 bytes before 2.4.0
+            || schemaVersionBytes.length == Long.SIZE) {
+            ByteBuffer bb = ByteBuffer.wrap(schemaVersionBytes);
+            return String.valueOf(bb.getLong());
+        } else if (schemaVersionBytes.length == 0) {
+            return "EMPTY";
+        } else {
+            return Base64.getEncoder().encodeToString(schemaVersionBytes);
+        }
+
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 89427d4..dd618ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -131,6 +131,12 @@ public abstract class StructSchema<T> implements Schema<T> {
         this.schemaInfoProvider = schemaInfoProvider;
     }
 
+    /**
+     * Load the schema reader for reading messages encoded by the given schema version.
+     *
+     * @param schemaVersion the provided schema version
+     * @return the schema reader for decoding messages encoded by the provided schema version.
+     */
     protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
 
     protected void setWriter(SchemaWriter<T> writer) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 82eb119..2fa6829 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -18,18 +18,27 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * A generic avro schema.
  */
+@Slf4j
 public class GenericAvroSchema extends GenericSchemaImpl {
 
     public GenericAvroSchema(SchemaInfo schemaInfo) {
-        super(schemaInfo);
+        this(schemaInfo, true);
+    }
+
+    GenericAvroSchema(SchemaInfo schemaInfo,
+                      boolean useProvidedSchemaAsReaderSchema) {
+        super(schemaInfo, useProvidedSchemaAsReaderSchema);
         setReader(new GenericAvroReader(schema));
         setWriter(new GenericAvroWriter(schema));
     }
@@ -48,11 +57,19 @@ public class GenericAvroSchema extends GenericSchemaImpl {
     protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
          SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
          if (schemaInfo != null) {
+             log.info("Load schema reader for version({}), schema is : {}",
+                 SchemaUtils.getStringSchemaVersion(schemaVersion),
+                 schemaInfo.getSchemaDefinition());
+             Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
+             Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
              return new GenericAvroReader(
-                     parseAvroSchema(new String(schemaInfo.getSchema())),
-                     schema,
+                     writerSchema,
+                     readerSchema,
                      schemaVersion);
          } else {
+             log.warn("No schema found for version({}), use latest schema : {}",
+                 SchemaUtils.getStringSchemaVersion(schemaVersion),
+                 this.schemaInfo.getSchemaDefinition());
              return reader;
          }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 14d939f..3b82591 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -20,21 +20,28 @@ package org.apache.pulsar.client.impl.schema.generic;
 
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 /**
  * A generic json schema.
  */
+@Slf4j
 class GenericJsonSchema extends GenericSchemaImpl {
 
     public GenericJsonSchema(SchemaInfo schemaInfo) {
-        super(schemaInfo);
+        this(schemaInfo, true);
+    }
+
+    GenericJsonSchema(SchemaInfo schemaInfo,
+                      boolean useProvidedSchemaAsReaderSchema) {
+        super(schemaInfo, useProvidedSchemaAsReaderSchema);
         setWriter(new GenericJsonWriter());
         setReader(new GenericJsonReader(fields));
     }
@@ -43,12 +50,24 @@ class GenericJsonSchema extends GenericSchemaImpl {
     protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
         SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
         if (schemaInfo != null) {
+            log.info("Load schema reader for version({}), schema is : {}",
+                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                schemaInfo.getSchemaDefinition());
+            Schema readerSchema;
+            if (useProvidedSchemaAsReaderSchema) {
+                readerSchema = schema;
+            } else {
+                readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
+            }
             return new GenericJsonReader(schemaVersion,
-                    (parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)).getFields()
+                    readerSchema.getFields()
                             .stream()
                             .map(f -> new Field(f.name(), f.pos()))
-                            .collect(Collectors.toList())));
+                            .collect(Collectors.toList()));
         } else {
+            log.warn("No schema found for version({}), use latest schema : {}",
+                SchemaUtils.getStringSchemaVersion(schemaVersion),
+                this.schemaInfo.getSchemaDefinition());
             return reader;
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index a8aa283..f22c449 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -33,14 +33,20 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
 
     protected final List<Field> fields;
+    // the flag controls whether to use the provided schema as reader schema
+    // to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
+    // allows decoding the messages using the schema associated with the messages.
+    protected final boolean useProvidedSchemaAsReaderSchema;
 
-    protected GenericSchemaImpl(SchemaInfo schemaInfo) {
+    protected GenericSchemaImpl(SchemaInfo schemaInfo,
+                                boolean useProvidedSchemaAsReaderSchema) {
         super(schemaInfo);
 
         this.fields = schema.getFields()
                 .stream()
                 .map(f -> new Field(f.name(), f.pos()))
                 .collect(Collectors.toList());
+        this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
     }
 
     @Override
@@ -55,11 +61,16 @@ public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> impl
      * @return a generic schema instance
      */
     public static GenericSchemaImpl of(SchemaInfo schemaInfo) {
+        return of(schemaInfo, true);
+    }
+
+    public static GenericSchemaImpl of(SchemaInfo schemaInfo,
+                                       boolean useProvidedSchemaAsReaderSchema) {
         switch (schemaInfo.getType()) {
             case AVRO:
-                return new GenericAvroSchema(schemaInfo);
+                return new GenericAvroSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
             case JSON:
-                return new GenericJsonSchema(schemaInfo);
+                return new GenericJsonSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
             default:
                 throw new UnsupportedOperationException("Generic schema is not supported on schema type "
                     + schemaInfo.getType() + "'");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
index 5a3ca47..e5f368f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
@@ -18,17 +18,22 @@
  */
 package org.apache.pulsar.tests.integration.cli;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+
+import static org.testng.Assert.assertEquals;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.avro.reflect.AvroAlias;
 import org.apache.avro.reflect.AvroDefault;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
@@ -58,8 +63,9 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
 
         try (PulsarClient pulsarClient = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+            V1Data v1Data = new V1Data("test1", 1);
             try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
-                p.send(new V1Data("test1", 1));
+                p.send(v1Data);
             }
 
             log.info("try with forward compat, should fail");
@@ -70,8 +76,25 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             }
 
             log.info("try with backward compat, should succeed");
+            V2Data v2Data = new V2Data("test2");
             try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
-                p.send(new V2Data("test2"));
+                p.send(v2Data);
+            }
+
+            Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+            try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+                 .topic(topicName)
+                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                 .subscriptionName("sub")
+                 .subscribe()
+            ) {
+                log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+                Message<GenericRecord> msg1 = consumer.receive();
+                v1Data.assertEqualToRecord(msg1.getValue());
+
+                Message<GenericRecord> msg2 = consumer.receive();
+                v2Data.assertEqualToRecord(msg2.getValue());
             }
         }
     }
@@ -85,18 +108,40 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
 
         try (PulsarClient pulsarClient = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+            V1Data v1Data = new V1Data("test1", 1);
             try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
-                p.send(new V1Data("test1", 1));
+                p.send(v1Data);
             }
 
             log.info("try with forward compat, should succeed");
+            V3Data v3Data = new V3Data("test3", 1, 2);
             try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
-                p.send(new V3Data("test3", 1, 2));
+                p.send(v3Data);
             }
 
             log.info("try with backward compat, should succeed");
+            V2Data v2Data = new V2Data("test2");
             try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
-                p.send(new V2Data("test2"));
+                p.send(v2Data);
+            }
+
+            Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+            try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+                 .topic(topicName)
+                 .subscriptionName("sub")
+                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                 .subscribe()
+            ) {
+                log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+                Message<GenericRecord> msg1 = consumer.receive();
+                v1Data.assertEqualToRecord(msg1.getValue());
+
+                Message<GenericRecord> msg2 = consumer.receive();
+                v3Data.assertEqualToRecord(msg2.getValue());
+
+                Message<GenericRecord> msg3 = consumer.receive();
+                v2Data.assertEqualToRecord(msg3.getValue());
             }
         }
     }
@@ -111,8 +156,9 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
         try (PulsarClient pulsarClient = PulsarClient.builder()
              .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
 
+            V1Data v1Data = new V1Data("test1", 1);
             try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
-                p.send(new V1Data("test1", 1));
+                p.send(v1Data);
             }
 
             log.info("try with backward compat, should fail");
@@ -123,13 +169,25 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             }
 
             log.info("try with forward compat, should succeed");
+            V3Data v3Data = new V3Data("test2", 1, 2);
             try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
-                p.send(new V3Data("test2", 1, 2));
+                p.send(v3Data);
             }
 
-            log.info("try with fully compat, should succeed");
-            try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
-                p.send(new V4Data("test2", 1, (short)100));
+            Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+            try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+                 .topic(topicName)
+                 .subscriptionName("sub")
+                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                 .subscribe()
+            ) {
+                log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+                Message<GenericRecord> msg1 = consumer.receive();
+                v1Data.assertEqualToRecord(msg1.getValue());
+
+                Message<GenericRecord> msg2 = consumer.receive();
+                v3Data.assertEqualToRecord(msg2.getValue());
             }
         }
 
@@ -142,8 +200,10 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
 
         try (PulsarClient pulsarClient = PulsarClient.builder()
              .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+
+            V1Data v1Data = new V1Data("test1", 1);
             try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
-                p.send(new V1Data("test1", 1));
+                p.send(v1Data);
             }
 
             log.info("try with backward compat only, should fail");
@@ -161,8 +221,25 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             }
 
             log.info("try with fully compat");
+            V4Data v4Data = new V4Data("test2", 1, (short) 100);
             try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
-                p.send(new V4Data("test2", 1, (short)100));
+                p.send(v4Data);
+            }
+
+            Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+            try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+                 .topic(topicName)
+                 .subscriptionName("sub")
+                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                 .subscribe()
+            ) {
+                log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+                Message<GenericRecord> msg1 = consumer.receive();
+                v1Data.assertEqualToRecord(msg1.getValue());
+
+                Message<GenericRecord> msg2 = consumer.receive();
+                v4Data.assertEqualToRecord(msg2.getValue());
             }
         }
     }
@@ -204,7 +281,7 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             ObjectMapper mapper = new ObjectMapper();
             Map<String, String> schema = new HashMap<>();
             schema.put("type", "AVRO");
-            schema.put("schema", new String(Schema.AVRO(V4Data.class).getSchemaInfo().getSchema(), UTF_8));
+            schema.put("schema", Schema.AVRO(V4Data.class).getSchemaInfo().getSchemaDefinition());
             BrokerContainer b = pulsarCluster.getAnyBroker();
             String schemaFile = String.format("/tmp/schema-%s", UUID.randomUUID().toString());
             b.putFile(schemaFile, mapper.writeValueAsBytes(schema));
@@ -236,6 +313,14 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             this.foo = foo;
             this.bar = bar;
         }
+
+        void assertEqualToRecord(GenericRecord record) {
+            assertEquals(
+                2, record.getFields().size(),
+                record.getFields().size() + " fields in found : " + record.getFields());
+            assertEquals(foo, record.getField("foo"));
+            assertEquals(Integer.valueOf(bar), record.getField("bar"));
+        }
     }
 
     // backward compatible with V1Data
@@ -246,6 +331,13 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
         V2Data(String foo) {
             this.foo = foo;
         }
+
+        void assertEqualToRecord(GenericRecord record) {
+            assertEquals(
+                1, record.getFields().size(),
+                record.getFields().size() + " fields in found : " + record.getFields());
+            assertEquals(foo, record.getField("foo"));
+        }
     }
 
     // forward compatible with V1Data
@@ -260,6 +352,15 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             this.bar = bar;
             this.baz = baz;
         }
+
+        void assertEqualToRecord(GenericRecord record) {
+            assertEquals(
+                3, record.getFields().size(),
+                record.getFields().size() + " fields in found : " + record.getFields());
+            assertEquals(foo, record.getField("foo"));
+            assertEquals(Integer.valueOf(bar), record.getField("bar"));
+            assertEquals(Long.valueOf(baz), record.getField("baz"));
+        }
     }
 
     // fully compatible with V1Data
@@ -275,6 +376,17 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
             this.bar = bar;
             this.blah = blah;
         }
+
+        void assertEqualToRecord(GenericRecord record) {
+            assertEquals(
+                3, record.getFields().size(),
+                record.getFields().size() + " fields in found : " + record.getFields());
+            assertEquals(foo, record.getField("foo"));
+            assertEquals(Integer.valueOf(bar), record.getField("bar"));
+            // NOTE: in generic record, avro returns integer. we can consider improving the
+            // the behavior in future to reflect the right java class.
+            assertEquals(Integer.valueOf(blah), record.getField("blah"));
+        }
     }
 
     @Test