You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:26 UTC
[pulsar] 32/46: [Java client] Fix behaviour of
Schema.AUTO_CONSUME() with KeyValueSchema and multi versions (#10492)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1264c47622366c4a9fe86e5999f511f1c8816473
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Fri May 7 22:56:37 2021 +0200
[Java client] Fix behaviour of Schema.AUTO_CONSUME() with KeyValueSchema and multi versions (#10492)
Co-authored-by: Enrico Olivelli <eo...@apache.org>
(cherry picked from commit 9b92dc26a6b7fad9e1406ecfb010069119fab7bc)
---
.../apache/pulsar/client/api/SimpleSchemaTest.java | 381 ++++++++++++++++-----
.../client/impl/schema/AutoConsumeSchema.java | 2 +-
2 files changed, 293 insertions(+), 90 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index f670ef1..ae44311 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -29,8 +29,11 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
@@ -58,16 +61,19 @@ import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+@Test(groups = "broker-api")
+@Slf4j
public class SimpleSchemaTest extends ProducerConsumerBase {
@DataProvider(name = "batchingModes")
public static Object[][] batchingModes() {
return new Object[][] {
- { true },
- { false }
+ { true },
+ { false }
};
}
@@ -110,7 +116,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
.topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic1").create()) {
+ .topic("persistent://my-property/my-ns/my-topic1").create()) {
int N = 10;
for (int i = 0; i < N; i++) {
@@ -202,14 +208,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
p.send("junkdata".getBytes(UTF_8));
} else {
Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema"
- + " if SchemaValidationEnabled is enabled");
+ + " if SchemaValidationEnabled is enabled");
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
- + " if SchemaValidationEnforced is disabled");
+ + " if SchemaValidationEnforced is disabled");
}
}
@@ -233,10 +239,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
try (Producer<V1Data> ignored = pulsarClient.newProducer(v1Schema)
- .topic(topic).create()) {
+ .topic(topic).create()) {
}
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class))
- .topic(topic).create()) {
+ .topic(topic).create()) {
p.send(new V2Data(-1, -1));
}
V1Data dataV1 = new V1Data(2);
@@ -244,14 +250,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
- .topic(topic).create();
- Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
- .topic(topic)
- .subscriptionName("sub1").subscribe()) {
+ .topic(topic).create();
+ Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
+ .topic(topic)
+ .subscriptionName("sub1").subscribe()) {
Assert.expectThrows(SchemaSerializationException.class, () -> p.send(contentV1));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
- .value(contentV1).send();
+ .value(contentV1).send();
p.send(contentV2);
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
@@ -267,7 +273,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
p.newMessage(Schema.BYTES).value(contentV1).send();
if (schemaValidationEnforced) {
Assert.fail("Shouldn't be able to send to a schema'd topic with no schema"
- + " if SchemaValidationEnabled is enabled");
+ + " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
@@ -276,7 +282,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
- + " if SchemaValidationEnforced is disabled");
+ + " if SchemaValidationEnforced is disabled");
}
}
}
@@ -289,10 +295,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
V2Data data2 = new V2Data(3, 5);
V1Data data3 = new V1Data(8);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic).create();
+ .topic(topic).create();
Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
- .topic(topic)
- .subscriptionName("sub1").subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1").subscribe()) {
p.newMessage().value(data1).send();
p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send();
p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send();
@@ -326,10 +332,10 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
try (Producer<byte[]> p = pulsarClient.newProducer()
- .topic(topic).create();
+ .topic(topic).create();
Consumer<byte[]> c = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("sub1").subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1").subscribe()) {
for (int i = 0; i < 2; ++i) {
V1Data dataV1 = new V1Data(i);
V2Data dataV2 = new V2Data(i, -i);
@@ -348,19 +354,19 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
- v2Schema.getSchemaInfo()));
+ v2Schema.getSchemaInfo()));
}
@Test
public void newProducerForMessageSchemaWithBatch() throws Exception {
String topic = "my-property/my-ns/schema-test";
Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
- .topic(topic)
- .subscriptionName("sub1").subscribe();
+ .topic(topic)
+ .subscriptionName("sub1").subscribe();
Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
- .topic(topic)
- .enableBatching(true)
- .batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
AvroWriter<V2Data> v2DataAvroWriter = new AvroWriter<>(
@@ -374,17 +380,17 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
if (i / batch % 2 == 0) {
byte[] content = v1DataAvroWriter.write(new V1Data(i));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
- .value(content).sendAsync();
+ .value(content).sendAsync();
} else {
byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
- .value(content).sendAsync();
+ .value(content).sendAsync();
}
if ((i + 1) % incompatible == 0) {
byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i));
try {
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
- .value(content).send();
+ .value(content).send();
} catch (Exception e) {
Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage());
}
@@ -409,11 +415,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
try (Producer<byte[]> p = pulsarClient.newProducer()
- .topic(topic)
- .enableMultiSchema(false).create()) {
+ .topic(topic)
+ .enableMultiSchema(false).create()) {
Assert.assertThrows(InvalidMessageException.class,
() -> p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
- .value(v1DataAvroWriter.write(new V1Data(0))).send());
+ .value(v1DataAvroWriter.write(new V1Data(0))).send());
}
}
@@ -449,7 +455,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
- .topic(topic).subscriptionName("sub1").subscribe()) {
+ .topic(topic).subscriptionName("sub1").subscribe()) {
V1Data toSend = new V1Data(1);
p.send(toSend);
Assert.assertEquals(toSend, c.receive().getValue());
@@ -481,14 +487,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .enableBatching(batching)
- .create();
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .subscriptionName("sub1")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
p.send(new V1Data(1));
Message<V1Data> data = c.receive();
@@ -502,14 +508,14 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
String topic = "my-property/my-ns/schema-test-auto-consume-" + batching;
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .enableBatching(batching)
- .create();
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
Consumer<GenericRecord> c = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
- .topic(topic)
- .subscriptionName("sub1")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
int numMessages = 10;
@@ -531,50 +537,55 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + batching;
Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
- Schema.AVRO(V1Data.class),
- Schema.AVRO(V1Data.class),
- KeyValueEncodingType.SEPARATED);
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED);
try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
- .topic(topic)
- .enableBatching(batching)
- .create();
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
+ Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<GenericRecord, GenericRecord>> c1 = pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AUTO_CONSUME(),
- Schema.AUTO_CONSUME(),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub1")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe();
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub1")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<V1Data, V1Data>> c2 = pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AVRO(V1Data.class),
- Schema.AVRO(V1Data.class),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub2")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe();
+ Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub2")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<GenericRecord, V1Data>> c3 = pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AUTO_CONSUME(),
- Schema.AVRO(V1Data.class),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub3")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe();
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub3")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<V1Data, GenericRecord>> c4 = pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AVRO(V1Data.class),
- Schema.AUTO_CONSUME(),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub4")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()
+ Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub4")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()
) {
int numMessages = 10;
@@ -584,12 +595,22 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
}
p.flush();
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ c0.acknowledge(wrapper);
+ }
// verify c1
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<GenericRecord, GenericRecord>> data = c1.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+ c1.acknowledge(data);
}
// verify c2
@@ -598,6 +619,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
+ c2.acknowledge(data);
}
// verify c3
@@ -606,6 +628,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
+ c3.acknowledge(data);
}
// verify c4
@@ -614,6 +637,186 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+ c4.acknowledge(data);
+ }
+ }
+
+ // new schema version
+
+ Schema<KeyValue<V2Data, V2Data>> pojoSchemaV2 = Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED);
+
+ try (Producer<KeyValue<V2Data, V2Data>> p = pulsarClient.newProducer(pojoSchemaV2)
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
+ Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+ .subscribe();
+ Consumer<KeyValue<GenericRecord, GenericRecord>> c1 = pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub1")
+ .subscribe();
+ Consumer<KeyValue<V2Data, V2Data>> c2 = pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub2")
+ .subscribe();
+ Consumer<KeyValue<GenericRecord, V2Data>> c3 = pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub3")
+ .subscribe();
+ Consumer<KeyValue<V2Data, GenericRecord>> c4 = pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub4")
+ .subscribe()
+ ) {
+
+ int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++) {
+ p.sendAsync(new KeyValue<>(new V2Data(i * 100, i), new V2Data(i * 1000, i * 20)));
+ }
+ p.flush();
+
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ assertEquals(data.getKey().getField("j"), i);
+ assertEquals(data.getValue().getField("j"), i * 20);
+ }
+ // verify c1
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<GenericRecord, GenericRecord>> data = c1.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+ assertEquals(data.getValue().getKey().getField("j"), i);
+ assertEquals(data.getValue().getValue().getField("j"), i * 20);
+ }
+
+ // verify c2
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<V2Data, V2Data>> data = c2.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().i, i * 100);
+ assertEquals(data.getValue().getValue().i, i * 1000);
+ assertEquals(data.getValue().getKey().j, (Integer) i);
+ assertEquals(data.getValue().getValue().j, (Integer) (i * 20));
+ }
+
+ // verify c3
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<GenericRecord, V2Data>> data = c3.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getValue().i, i * 1000);
+ assertEquals(data.getValue().getKey().getField("j"), (Integer) i);
+ assertEquals(data.getValue().getValue().j, (Integer) (i * 20));
+ }
+
+ // verify c4
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<V2Data, GenericRecord>> data = c4.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().i, i * 100);
+ assertEquals(data.getValue().getValue().getField("i"), i * 1000);
+ assertEquals(data.getValue().getKey().j, (Integer) i);
+ assertEquals(data.getValue().getValue().getField("j"), i * 20);
+ }
+ }
+
+ }
+
+ @Test
+ public void testAutoKeyValueConsumeGenericObject() throws Exception {
+ String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-"+ UUID.randomUUID();
+
+ Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED);
+
+ try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
+ .topic(topic)
+ .create();
+ Consumer<GenericRecord> c0 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ) {
+
+ int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++) {
+ p.sendAsync(new KeyValue<>(new V1Data(i * 100), new V1Data(i * 1000)));
+ }
+ p.flush();
+
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ log.info("schema version {}", BytesSchemaVersion.of(wrapper.getSchemaVersion()));
+ KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ c0.acknowledge(wrapper);
+ }
+
+
+ // new schema version
+
+ Schema<KeyValue<V2Data, V2Data>> pojoSchemaV2 = Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED);
+
+ try (Producer<KeyValue<V2Data, V2Data>> p2 = pulsarClient.newProducer(pojoSchemaV2)
+ .topic(topic)
+ .create();
+ ) {
+
+ for (int i = 0; i < numMessages; i++) {
+ p2.sendAsync(new KeyValue<>(new V2Data(i * 100, i), new V2Data(i * 1000, i * 20)));
+ }
+ p2.flush();
+
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ log.info("schema version {}", BytesSchemaVersion.of(wrapper.getSchemaVersion()));
+ KeyValue<GenericRecord, GenericRecord> data = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ assertEquals(data.getKey().getField("j"), i);
+ assertEquals(data.getValue().getField("j"), i * 20);
+ }
+
}
}
}
@@ -626,12 +829,12 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .create();
+ .topic(topic)
+ .create();
pulsarClient.newProducer(Schema.AVRO(V2Data.class))
- .topic(topic)
- .create();
+ .topic(topic)
+ .create();
LookupService httpLookupService = httpProtocolClient.getLookup();
LookupService binaryLookupService = binaryProtocolClient.getLookup();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 07c807c..1e43fc4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -191,7 +191,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
return LocalDateTimeSchema.of();
case JSON:
case AVRO:
- return GenericSchemaImpl.of(schemaInfo);
+ return GenericSchemaImpl.of(schemaInfo, false);
case PROTOBUF_NATIVE:
return GenericProtobufNativeSchema.of(schemaInfo);
case KEY_VALUE: