You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:26 UTC

[pulsar] 32/46: [Java client] Fix behaviour of Schema.AUTO_CONSUME() with KeyValueSchema and multi versions (#10492)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1264c47622366c4a9fe86e5999f511f1c8816473
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Fri May 7 22:56:37 2021 +0200

    [Java client] Fix behaviour of Schema.AUTO_CONSUME() with KeyValueSchema and multi versions (#10492)
    
    Co-authored-by: Enrico Olivelli <eo...@apache.org>
    (cherry picked from commit 9b92dc26a6b7fad9e1406ecfb010069119fab7bc)
---
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 381 ++++++++++++++++-----
 .../client/impl/schema/AutoConsumeSchema.java      |   2 +-
 2 files changed, 293 insertions(+), 90 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index f670ef1..ae44311 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -29,8 +29,11 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.Schema.Parser;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
@@ -58,16 +61,19 @@ import java.io.ByteArrayInputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = "broker-api")
+@Slf4j
 public class SimpleSchemaTest extends ProducerConsumerBase {
 
     @DataProvider(name = "batchingModes")
     public static Object[][] batchingModes() {
         return new Object[][] {
-            { true },
-            { false }
+                { true },
+                { false }
         };
     }
 
@@ -110,7 +116,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 .topic("persistent://my-property/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
              Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic1").create()) {
+                     .topic("persistent://my-property/my-ns/my-topic1").create()) {
             int N = 10;
 
             for (int i = 0; i < N; i++) {
@@ -202,14 +208,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 p.send("junkdata".getBytes(UTF_8));
             } else {
                 Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema"
-                    + " if SchemaValidationEnabled is enabled");
+                        + " if SchemaValidationEnabled is enabled");
             }
         } catch (PulsarClientException e) {
             if (schemaValidationEnforced) {
                 Assert.assertTrue(e instanceof IncompatibleSchemaException);
             } else {
                 Assert.fail("Shouldn't throw IncompatibleSchemaException"
-                    + " if SchemaValidationEnforced is disabled");
+                        + " if SchemaValidationEnforced is disabled");
             }
         }
 
@@ -233,10 +239,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         AvroWriter<V2Data> v2Writer = new AvroWriter<>(
                 new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
         try (Producer<V1Data> ignored = pulsarClient.newProducer(v1Schema)
-                                                    .topic(topic).create()) {
+                .topic(topic).create()) {
         }
         try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class))
-                                              .topic(topic).create()) {
+                .topic(topic).create()) {
             p.send(new V2Data(-1, -1));
         }
         V1Data dataV1 = new V1Data(2);
@@ -244,14 +250,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         byte[] contentV1 = v1Writer.write(dataV1);
         byte[] contentV2 = v2Writer.write(dataV2);
         try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
-                                              .topic(topic).create();
-                Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
-                                                 .topic(topic)
-                                                 .subscriptionName("sub1").subscribe()) {
+                .topic(topic).create();
+             Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
+                     .topic(topic)
+                     .subscriptionName("sub1").subscribe()) {
             Assert.expectThrows(SchemaSerializationException.class, () -> p.send(contentV1));
 
             p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
-             .value(contentV1).send();
+                    .value(contentV1).send();
             p.send(contentV2);
             Message<V2Data> msg1 = c.receive();
             V2Data msg1Value = msg1.getValue();
@@ -267,7 +273,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 p.newMessage(Schema.BYTES).value(contentV1).send();
                 if (schemaValidationEnforced) {
                     Assert.fail("Shouldn't be able to send to a schema'd topic with no schema"
-                                        + " if SchemaValidationEnabled is enabled");
+                            + " if SchemaValidationEnabled is enabled");
                 }
                 Message<V2Data> msg3 = c.receive();
                 Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
@@ -276,7 +282,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                     Assert.assertTrue(e instanceof IncompatibleSchemaException);
                 } else {
                     Assert.fail("Shouldn't throw IncompatibleSchemaException"
-                                        + " if SchemaValidationEnforced is disabled");
+                            + " if SchemaValidationEnforced is disabled");
                 }
             }
         }
@@ -289,10 +295,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         V2Data data2 = new V2Data(3, 5);
         V1Data data3 = new V1Data(8);
         try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
-                                              .topic(topic).create();
+                .topic(topic).create();
              Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
-                                              .topic(topic)
-                                              .subscriptionName("sub1").subscribe()) {
+                     .topic(topic)
+                     .subscriptionName("sub1").subscribe()) {
             p.newMessage().value(data1).send();
             p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send();
             p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send();
@@ -326,10 +332,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         AvroWriter<V2Data> v2Writer = new AvroWriter<>(
                 new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
         try (Producer<byte[]> p = pulsarClient.newProducer()
-                                              .topic(topic).create();
+                .topic(topic).create();
              Consumer<byte[]> c = pulsarClient.newConsumer()
-                                              .topic(topic)
-                                              .subscriptionName("sub1").subscribe()) {
+                     .topic(topic)
+                     .subscriptionName("sub1").subscribe()) {
             for (int i = 0; i < 2; ++i) {
                 V1Data dataV1 = new V1Data(i);
                 V2Data dataV2 = new V2Data(i, -i);
@@ -348,19 +354,19 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
 
         List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
         Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
-                                                      v2Schema.getSchemaInfo()));
+                v2Schema.getSchemaInfo()));
     }
 
     @Test
     public void newProducerForMessageSchemaWithBatch() throws Exception {
         String topic = "my-property/my-ns/schema-test";
         Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
-                                         .topic(topic)
-                                         .subscriptionName("sub1").subscribe();
+                .topic(topic)
+                .subscriptionName("sub1").subscribe();
         Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
-                                         .topic(topic)
-                                         .enableBatching(true)
-                                         .batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
         AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
                 ReflectData.AllowNull.get().getSchema(V1Data.class));
         AvroWriter<V2Data> v2DataAvroWriter = new AvroWriter<>(
@@ -374,17 +380,17 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             if (i / batch % 2 == 0) {
                 byte[] content = v1DataAvroWriter.write(new V1Data(i));
                 p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
-                 .value(content).sendAsync();
+                        .value(content).sendAsync();
             } else {
                 byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total));
                 p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
-                 .value(content).sendAsync();
+                        .value(content).sendAsync();
             }
             if ((i + 1) % incompatible == 0) {
                 byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i));
                 try {
                     p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
-                     .value(content).send();
+                            .value(content).send();
                 } catch (Exception e) {
                     Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage());
                 }
@@ -409,11 +415,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
                 ReflectData.AllowNull.get().getSchema(V1Data.class));
         try (Producer<byte[]> p = pulsarClient.newProducer()
-                                              .topic(topic)
-                                              .enableMultiSchema(false).create()) {
+                .topic(topic)
+                .enableMultiSchema(false).create()) {
             Assert.assertThrows(InvalidMessageException.class,
                     () -> p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
-                           .value(v1DataAvroWriter.write(new V1Data(0))).send());
+                            .value(v1DataAvroWriter.write(new V1Data(0))).send());
         }
     }
 
@@ -449,7 +455,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
 
         try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
              Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
-                .topic(topic).subscriptionName("sub1").subscribe()) {
+                     .topic(topic).subscriptionName("sub1").subscribe()) {
             V1Data toSend = new V1Data(1);
             p.send(toSend);
             Assert.assertEquals(toSend, c.receive().getValue());
@@ -481,14 +487,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         String topic = "my-property/my-ns/schema-test";
 
         try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
-             .topic(topic)
-             .enableBatching(batching)
-             .create();
+                .topic(topic)
+                .enableBatching(batching)
+                .create();
              Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
-             .topic(topic)
-             .subscriptionName("sub1")
-             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-             .subscribe()) {
+                     .topic(topic)
+                     .subscriptionName("sub1")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe()) {
 
             p.send(new V1Data(1));
             Message<V1Data> data = c.receive();
@@ -502,14 +508,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         String topic = "my-property/my-ns/schema-test-auto-consume-" + batching;
 
         try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
-             .topic(topic)
-             .enableBatching(batching)
-             .create();
+                .topic(topic)
+                .enableBatching(batching)
+                .create();
              Consumer<GenericRecord> c = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
-             .topic(topic)
-             .subscriptionName("sub1")
-             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-             .subscribe()) {
+                     .topic(topic)
+                     .subscriptionName("sub1")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe()) {
 
             int numMessages = 10;
 
@@ -531,50 +537,55 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + batching;
 
         Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
-            Schema.AVRO(V1Data.class),
-            Schema.AVRO(V1Data.class),
-            KeyValueEncodingType.SEPARATED);
+                Schema.AVRO(V1Data.class),
+                Schema.AVRO(V1Data.class),
+                KeyValueEncodingType.SEPARATED);
 
         try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
-             .topic(topic)
-             .enableBatching(batching)
-             .create();
+                .topic(topic)
+                .enableBatching(batching)
+                .create();
+             Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                     .topic(topic)
+                     .subscriptionName("sub0")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe();
              Consumer<KeyValue<GenericRecord, GenericRecord>> c1 = pulsarClient.newConsumer(
-                 Schema.KeyValue(
-                     Schema.AUTO_CONSUME(),
-                     Schema.AUTO_CONSUME(),
-                     KeyValueEncodingType.SEPARATED))
-             .topic(topic)
-             .subscriptionName("sub1")
-             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-             .subscribe();
+                     Schema.KeyValue(
+                             Schema.AUTO_CONSUME(),
+                             Schema.AUTO_CONSUME(),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub1")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe();
              Consumer<KeyValue<V1Data, V1Data>> c2 = pulsarClient.newConsumer(
-                 Schema.KeyValue(
-                     Schema.AVRO(V1Data.class),
-                     Schema.AVRO(V1Data.class),
-                     KeyValueEncodingType.SEPARATED))
-             .topic(topic)
-             .subscriptionName("sub2")
-             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-             .subscribe();
+                     Schema.KeyValue(
+                             Schema.AVRO(V1Data.class),
+                             Schema.AVRO(V1Data.class),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub2")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe();
              Consumer<KeyValue<GenericRecord, V1Data>> c3 = pulsarClient.newConsumer(
-                 Schema.KeyValue(
-                     Schema.AUTO_CONSUME(),
-                     Schema.AVRO(V1Data.class),
-                     KeyValueEncodingType.SEPARATED))
-             .topic(topic)
-             .subscriptionName("sub3")
-             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-             .subscribe();
+                     Schema.KeyValue(
+                             Schema.AUTO_CONSUME(),
+                             Schema.AVRO(V1Data.class),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub3")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe();
              Consumer<KeyValue<V1Data, GenericRecord>> c4 = pulsarClient.newConsumer(
-                 Schema.KeyValue(
-                     Schema.AVRO(V1Data.class),
-                     Schema.AUTO_CONSUME(),
-                     KeyValueEncodingType.SEPARATED))
-             .topic(topic)
-             .subscriptionName("sub4")
-             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-             .subscribe()
+                     Schema.KeyValue(
+                             Schema.AVRO(V1Data.class),
+                             Schema.AUTO_CONSUME(),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub4")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe()
         ) {
 
             int numMessages = 10;
@@ -584,12 +595,22 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             }
             p.flush();
 
+            // verify c0
+            for (int i = 0; i < numMessages; i++) {
+                Message<GenericRecord> wrapper = c0.receive();
+                KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+                assertNotNull(wrapper.getSchemaVersion());
+                assertEquals(data.getKey().getField("i"), i * 100);
+                assertEquals(data.getValue().getField("i"), i * 1000);
+                c0.acknowledge(wrapper);
+            }
             // verify c1
             for (int i = 0; i < numMessages; i++) {
                 Message<KeyValue<GenericRecord, GenericRecord>> data = c1.receive();
                 assertNotNull(data.getSchemaVersion());
                 assertEquals(data.getValue().getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+                c1.acknowledge(data);
             }
 
             // verify c2
@@ -598,6 +619,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertNotNull(data.getSchemaVersion());
                 assertEquals(data.getValue().getKey().i, i * 100);
                 assertEquals(data.getValue().getValue().i, i * 1000);
+                c2.acknowledge(data);
             }
 
             // verify c3
@@ -606,6 +628,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertNotNull(data.getSchemaVersion());
                 assertEquals(data.getValue().getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getValue().i, i * 1000);
+                c3.acknowledge(data);
             }
 
             // verify c4
@@ -614,6 +637,186 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertNotNull(data.getSchemaVersion());
                 assertEquals(data.getValue().getKey().i, i * 100);
                 assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+                c4.acknowledge(data);
+            }
+        }
+
+        // new schema version
+
+        Schema<KeyValue<V2Data, V2Data>> pojoSchemaV2 = Schema.KeyValue(
+                Schema.AVRO(V2Data.class),
+                Schema.AVRO(V2Data.class),
+                KeyValueEncodingType.SEPARATED);
+
+        try (Producer<KeyValue<V2Data, V2Data>> p = pulsarClient.newProducer(pojoSchemaV2)
+                .topic(topic)
+                .enableBatching(batching)
+                .create();
+             Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                     .topic(topic)
+                     .subscriptionName("sub0")
+                     .subscribe();
+             Consumer<KeyValue<GenericRecord, GenericRecord>> c1 = pulsarClient.newConsumer(
+                     Schema.KeyValue(
+                             Schema.AUTO_CONSUME(),
+                             Schema.AUTO_CONSUME(),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub1")
+                     .subscribe();
+             Consumer<KeyValue<V2Data, V2Data>> c2 = pulsarClient.newConsumer(
+                     Schema.KeyValue(
+                             Schema.AVRO(V2Data.class),
+                             Schema.AVRO(V2Data.class),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub2")
+                     .subscribe();
+             Consumer<KeyValue<GenericRecord, V2Data>> c3 = pulsarClient.newConsumer(
+                     Schema.KeyValue(
+                             Schema.AUTO_CONSUME(),
+                             Schema.AVRO(V2Data.class),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub3")
+                     .subscribe();
+             Consumer<KeyValue<V2Data, GenericRecord>> c4 = pulsarClient.newConsumer(
+                     Schema.KeyValue(
+                             Schema.AVRO(V2Data.class),
+                             Schema.AUTO_CONSUME(),
+                             KeyValueEncodingType.SEPARATED))
+                     .topic(topic)
+                     .subscriptionName("sub4")
+                     .subscribe()
+        ) {
+
+            int numMessages = 10;
+
+            for (int i = 0; i < numMessages; i++) {
+                p.sendAsync(new KeyValue<>(new V2Data(i * 100, i), new V2Data(i * 1000, i * 20)));
+            }
+            p.flush();
+
+            // verify c0
+            for (int i = 0; i < numMessages; i++) {
+                Message<GenericRecord> wrapper = c0.receive();
+                KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+                assertNotNull(wrapper.getSchemaVersion());
+                assertEquals(data.getKey().getField("i"), i * 100);
+                assertEquals(data.getValue().getField("i"), i * 1000);
+                assertEquals(data.getKey().getField("j"), i);
+                assertEquals(data.getValue().getField("j"), i * 20);
+            }
+            // verify c1
+            for (int i = 0; i < numMessages; i++) {
+                Message<KeyValue<GenericRecord, GenericRecord>> data = c1.receive();
+                assertNotNull(data.getSchemaVersion());
+                assertEquals(data.getValue().getKey().getField("i"), i * 100);
+                assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+                assertEquals(data.getValue().getKey().getField("j"), i);
+                assertEquals(data.getValue().getValue().getField("j"), i * 20);
+            }
+
+            // verify c2
+            for (int i = 0; i < numMessages; i++) {
+                Message<KeyValue<V2Data, V2Data>> data = c2.receive();
+                assertNotNull(data.getSchemaVersion());
+                assertEquals(data.getValue().getKey().i, i * 100);
+                assertEquals(data.getValue().getValue().i, i * 1000);
+                assertEquals(data.getValue().getKey().j, (Integer) i);
+                assertEquals(data.getValue().getValue().j, (Integer) (i * 20));
+            }
+
+            // verify c3
+            for (int i = 0; i < numMessages; i++) {
+                Message<KeyValue<GenericRecord, V2Data>> data = c3.receive();
+                assertNotNull(data.getSchemaVersion());
+                assertEquals(data.getValue().getKey().getField("i"), i * 100);
+                assertEquals(data.getValue().getValue().i, i * 1000);
+                assertEquals(data.getValue().getKey().getField("j"), (Integer) i);
+                assertEquals(data.getValue().getValue().j, (Integer) (i * 20));
+            }
+
+            // verify c4
+            for (int i = 0; i < numMessages; i++) {
+                Message<KeyValue<V2Data, GenericRecord>> data = c4.receive();
+                assertNotNull(data.getSchemaVersion());
+                assertEquals(data.getValue().getKey().i, i * 100);
+                assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+                assertEquals(data.getValue().getKey().j, (Integer) i);
+                assertEquals(data.getValue().getValue().getField("j"), i * 20);
+            }
+        }
+
+    }
+
+    @Test
+    public void testAutoKeyValueConsumeGenericObject() throws Exception {
+        String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-"+ UUID.randomUUID();
+
+        Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
+                Schema.AVRO(V1Data.class),
+                Schema.AVRO(V1Data.class),
+                KeyValueEncodingType.SEPARATED);
+
+        try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
+                .topic(topic)
+                .create();
+             Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                     .topic(topic)
+                     .subscriptionName("sub0")
+                     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                     .subscribe();
+        ) {
+
+            int numMessages = 10;
+
+            for (int i = 0; i < numMessages; i++) {
+                p.sendAsync(new KeyValue<>(new V1Data(i * 100), new V1Data(i * 1000)));
+            }
+            p.flush();
+
+            // verify c0
+            for (int i = 0; i < numMessages; i++) {
+                Message<GenericRecord> wrapper = c0.receive();
+                log.info("schema version {}", BytesSchemaVersion.of(wrapper.getSchemaVersion()));
+                KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+                assertNotNull(wrapper.getSchemaVersion());
+                assertEquals(data.getKey().getField("i"), i * 100);
+                assertEquals(data.getValue().getField("i"), i * 1000);
+                c0.acknowledge(wrapper);
+            }
+
+
+            // new schema version
+
+            Schema<KeyValue<V2Data, V2Data>> pojoSchemaV2 = Schema.KeyValue(
+                    Schema.AVRO(V2Data.class),
+                    Schema.AVRO(V2Data.class),
+                    KeyValueEncodingType.SEPARATED);
+
+            try (Producer<KeyValue<V2Data, V2Data>> p2 = pulsarClient.newProducer(pojoSchemaV2)
+                    .topic(topic)
+                    .create();
+            ) {
+
+                for (int i = 0; i < numMessages; i++) {
+                    p2.sendAsync(new KeyValue<>(new V2Data(i * 100, i), new V2Data(i * 1000, i * 20)));
+                }
+                p2.flush();
+
+                // verify c0
+                for (int i = 0; i < numMessages; i++) {
+                    Message<GenericRecord> wrapper = c0.receive();
+                    log.info("schema version {}", BytesSchemaVersion.of(wrapper.getSchemaVersion()));
+                    KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+                    assertNotNull(wrapper.getSchemaVersion());
+                    assertEquals(data.getKey().getField("i"), i * 100);
+                    assertEquals(data.getValue().getField("i"), i * 1000);
+                    assertEquals(data.getKey().getField("j"), i);
+                    assertEquals(data.getValue().getField("j"), i * 20);
+                }
+
             }
         }
     }
@@ -626,12 +829,12 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
         PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;
 
         pulsarClient.newProducer(Schema.AVRO(V1Data.class))
-            .topic(topic)
-            .create();
+                .topic(topic)
+                .create();
 
         pulsarClient.newProducer(Schema.AVRO(V2Data.class))
-            .topic(topic)
-            .create();
+                .topic(topic)
+                .create();
 
         LookupService httpLookupService = httpProtocolClient.getLookup();
         LookupService binaryLookupService = binaryProtocolClient.getLookup();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 07c807c..1e43fc4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -191,7 +191,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
                 return LocalDateTimeSchema.of();
             case JSON:
             case AVRO:
-                return GenericSchemaImpl.of(schemaInfo);
+                return GenericSchemaImpl.of(schemaInfo, false);
             case PROTOBUF_NATIVE:
                 return GenericProtobufNativeSchema.of(schemaInfo);
             case KEY_VALUE: