You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/22 16:23:43 UTC
[pulsar] branch master updated: [schema] AutoConsume should use the
schema associated with messages as both writer and reader schema (#4325)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bf06ef3 [schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325)
bf06ef3 is described below
commit bf06ef3ebeccfe758a3db1724bac62dd99a060de
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 23 00:23:37 2019 +0800
[schema] AutoConsume should use the schema associated with messages as both writer and reader schema (#4325)
* [schema] AutoConsume should use the schema associated with messages for both writer and reader schema
*Motivation*
AutoConsume should use the schema associated with the messages for decoding the schemas.
*Modifications*
- provide a flag enable or disable using the provided schema as the reader schema
- for AUTO_CONSUME schema, disable usnig the provided schema as the reader schema. so it can use the right
schema version for decoding messages into right generic records
- provide a few util methods for displaying schema data
* Handle 64 bytes schema version
* Addressed review comments
---
.../apache/pulsar/common/schema/SchemaInfo.java | 18 +++
.../pulsar/client/impl/PulsarClientImpl.java | 11 +-
.../pulsar/client/impl/schema/AvroSchema.java | 9 +-
.../pulsar/client/impl/schema/JSONSchema.java | 1 -
.../pulsar/client/impl/schema/ProtobufSchema.java | 4 +-
.../pulsar/client/impl/schema/SchemaUtils.java | 22 +++-
.../pulsar/client/impl/schema/StructSchema.java | 6 +
.../impl/schema/generic/GenericAvroSchema.java | 23 +++-
.../impl/schema/generic/GenericJsonSchema.java | 29 ++++-
.../impl/schema/generic/GenericSchemaImpl.java | 17 ++-
.../integration/cli/SchemaUpdateStrategyTest.java | 140 ++++++++++++++++++---
11 files changed, 245 insertions(+), 35 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 87d0e15..c5d1b72 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.schema;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Base64;
import java.util.Collections;
import java.util.Map;
@@ -52,4 +55,19 @@ public class SchemaInfo {
* Additional properties of the schema definition (implementation defined)
*/
private Map<String, String> properties = Collections.emptyMap();
+
+ public String getSchemaDefinition() {
+ if (null == schema) {
+ return "";
+ }
+
+ switch (type) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ return new String(schema, UTF_8);
+ default:
+ return Base64.getEncoder().encodeToString(schema);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 73a4a3c..57d1956 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -730,13 +730,16 @@ public class PulsarClientImpl implements PulsarClient {
if (schema instanceof AutoConsumeSchema) {
SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
- if (schemaInfo.getType() != SchemaType.AVRO){
+ if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
-
}
- GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema());
+
+ // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
+ // to decode the messages.
+ GenericSchema genericSchema = GenericSchemaImpl.of(
+ schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
log.info("Auto detected schema for topic {} : {}",
- topicName, new String(schemaInfo.getSchema(), UTF_8));
+ topicName, schemaInfo.getSchemaDefinition());
((AutoConsumeSchema) schema).setSchema(genericSchema);
}
schema.setSchemaInfoProvider(schemaInfoProvider);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 14021f2..2944e14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -102,11 +102,16 @@ public class AvroSchema<T> extends StructSchema<T> {
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
- return new AvroReader<>(parseAvroSchema(new String(schemaInfo.getSchema())), schema);
+ log.info("Load schema reader for version({}), schema is : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion),
+ schemaInfo.getSchemaDefinition());
+ return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
} else {
+ log.warn("No schema found for version({}), use latest schema : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion),
+ this.schemaInfo.getSchemaDefinition());
return reader;
}
}
-
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4646465..04b249c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.JsonReader;
import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 9328908..b40f2ef 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -27,7 +27,6 @@ import lombok.Getter;
import org.apache.avro.protobuf.ProtobufData;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -96,7 +95,8 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
@Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
- throw new RuntimeException("ProtobufSchema don't support schema versioning"); }
+ throw new RuntimeException("ProtobufSchema don't support schema versioning");
+ }
public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index dce7192..7ffdd18 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -22,15 +22,17 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
/**
* Utils for schemas.
*/
-final class SchemaUtils {
+public final class SchemaUtils {
private SchemaUtils() {}
@@ -149,4 +151,22 @@ final class SchemaUtils {
}
}
+ public static String getStringSchemaVersion(byte[] schemaVersionBytes) {
+ if (null == schemaVersionBytes) {
+ return "NULL";
+ } else if (
+ // the length of schema version is 8 bytes post 2.4.0
+ schemaVersionBytes.length == Long.BYTES
+ // the length of schema version is 64 bytes before 2.4.0
+ || schemaVersionBytes.length == Long.SIZE) {
+ ByteBuffer bb = ByteBuffer.wrap(schemaVersionBytes);
+ return String.valueOf(bb.getLong());
+ } else if (schemaVersionBytes.length == 0) {
+ return "EMPTY";
+ } else {
+ return Base64.getEncoder().encodeToString(schemaVersionBytes);
+ }
+
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 89427d4..dd618ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -131,6 +131,12 @@ public abstract class StructSchema<T> implements Schema<T> {
this.schemaInfoProvider = schemaInfoProvider;
}
+ /**
+ * Load the schema reader for reading messages encoded by the given schema version.
+ *
+ * @param schemaVersion the provided schema version
+ * @return the schema reader for decoding messages encoded by the provided schema version.
+ */
protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
protected void setWriter(SchemaWriter<T> writer) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 82eb119..2fa6829 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -18,18 +18,27 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
* A generic avro schema.
*/
+@Slf4j
public class GenericAvroSchema extends GenericSchemaImpl {
public GenericAvroSchema(SchemaInfo schemaInfo) {
- super(schemaInfo);
+ this(schemaInfo, true);
+ }
+
+ GenericAvroSchema(SchemaInfo schemaInfo,
+ boolean useProvidedSchemaAsReaderSchema) {
+ super(schemaInfo, useProvidedSchemaAsReaderSchema);
setReader(new GenericAvroReader(schema));
setWriter(new GenericAvroWriter(schema));
}
@@ -48,11 +57,19 @@ public class GenericAvroSchema extends GenericSchemaImpl {
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
+ log.info("Load schema reader for version({}), schema is : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion),
+ schemaInfo.getSchemaDefinition());
+ Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
+ Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
return new GenericAvroReader(
- parseAvroSchema(new String(schemaInfo.getSchema())),
- schema,
+ writerSchema,
+ readerSchema,
schemaVersion);
} else {
+ log.warn("No schema found for version({}), use latest schema : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion),
+ this.schemaInfo.getSchemaDefinition());
return reader;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 14d939f..3b82591 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -20,21 +20,28 @@ package org.apache.pulsar.client.impl.schema.generic;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.schema.SchemaInfo;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
* A generic json schema.
*/
+@Slf4j
class GenericJsonSchema extends GenericSchemaImpl {
public GenericJsonSchema(SchemaInfo schemaInfo) {
- super(schemaInfo);
+ this(schemaInfo, true);
+ }
+
+ GenericJsonSchema(SchemaInfo schemaInfo,
+ boolean useProvidedSchemaAsReaderSchema) {
+ super(schemaInfo, useProvidedSchemaAsReaderSchema);
setWriter(new GenericJsonWriter());
setReader(new GenericJsonReader(fields));
}
@@ -43,12 +50,24 @@ class GenericJsonSchema extends GenericSchemaImpl {
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
+ log.info("Load schema reader for version({}), schema is : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion),
+ schemaInfo.getSchemaDefinition());
+ Schema readerSchema;
+ if (useProvidedSchemaAsReaderSchema) {
+ readerSchema = schema;
+ } else {
+ readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
+ }
return new GenericJsonReader(schemaVersion,
- (parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)).getFields()
+ readerSchema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
- .collect(Collectors.toList())));
+ .collect(Collectors.toList()));
} else {
+ log.warn("No schema found for version({}), use latest schema : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion),
+ this.schemaInfo.getSchemaDefinition());
return reader;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index a8aa283..f22c449 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -33,14 +33,20 @@ import org.apache.pulsar.common.schema.SchemaInfo;
public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
protected final List<Field> fields;
+ // the flag controls whether to use the provided schema as reader schema
+ // to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
+ // allows decoding the messages using the schema associated with the messages.
+ protected final boolean useProvidedSchemaAsReaderSchema;
- protected GenericSchemaImpl(SchemaInfo schemaInfo) {
+ protected GenericSchemaImpl(SchemaInfo schemaInfo,
+ boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo);
this.fields = schema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
+ this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
}
@Override
@@ -55,11 +61,16 @@ public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> impl
* @return a generic schema instance
*/
public static GenericSchemaImpl of(SchemaInfo schemaInfo) {
+ return of(schemaInfo, true);
+ }
+
+ public static GenericSchemaImpl of(SchemaInfo schemaInfo,
+ boolean useProvidedSchemaAsReaderSchema) {
switch (schemaInfo.getType()) {
case AVRO:
- return new GenericAvroSchema(schemaInfo);
+ return new GenericAvroSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
case JSON:
- return new GenericJsonSchema(schemaInfo);
+ return new GenericJsonSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
default:
throw new UnsupportedOperationException("Generic schema is not supported on schema type "
+ schemaInfo.getType() + "'");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
index 5a3ca47..e5f368f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/SchemaUpdateStrategyTest.java
@@ -18,17 +18,22 @@
*/
package org.apache.pulsar.tests.integration.cli;
-import static java.nio.charset.StandardCharsets.UTF_8;
+
+import static org.testng.Assert.assertEquals;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.reflect.AvroAlias;
import org.apache.avro.reflect.AvroDefault;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
@@ -58,8 +63,9 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+ V1Data v1Data = new V1Data("test1", 1);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
- p.send(new V1Data("test1", 1));
+ p.send(v1Data);
}
log.info("try with forward compat, should fail");
@@ -70,8 +76,25 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
}
log.info("try with backward compat, should succeed");
+ V2Data v2Data = new V2Data("test2");
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
- p.send(new V2Data("test2"));
+ p.send(v2Data);
+ }
+
+ Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+ try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+ .topic(topicName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("sub")
+ .subscribe()
+ ) {
+ log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+ Message<GenericRecord> msg1 = consumer.receive();
+ v1Data.assertEqualToRecord(msg1.getValue());
+
+ Message<GenericRecord> msg2 = consumer.receive();
+ v2Data.assertEqualToRecord(msg2.getValue());
}
}
}
@@ -85,18 +108,40 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+ V1Data v1Data = new V1Data("test1", 1);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
- p.send(new V1Data("test1", 1));
+ p.send(v1Data);
}
log.info("try with forward compat, should succeed");
+ V3Data v3Data = new V3Data("test3", 1, 2);
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
- p.send(new V3Data("test3", 1, 2));
+ p.send(v3Data);
}
log.info("try with backward compat, should succeed");
+ V2Data v2Data = new V2Data("test2");
try (Producer<V2Data> p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create()) {
- p.send(new V2Data("test2"));
+ p.send(v2Data);
+ }
+
+ Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+ try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+ .topic(topicName)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()
+ ) {
+ log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+ Message<GenericRecord> msg1 = consumer.receive();
+ v1Data.assertEqualToRecord(msg1.getValue());
+
+ Message<GenericRecord> msg2 = consumer.receive();
+ v3Data.assertEqualToRecord(msg2.getValue());
+
+ Message<GenericRecord> msg3 = consumer.receive();
+ v2Data.assertEqualToRecord(msg3.getValue());
}
}
}
@@ -111,8 +156,9 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+ V1Data v1Data = new V1Data("test1", 1);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
- p.send(new V1Data("test1", 1));
+ p.send(v1Data);
}
log.info("try with backward compat, should fail");
@@ -123,13 +169,25 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
}
log.info("try with forward compat, should succeed");
+ V3Data v3Data = new V3Data("test2", 1, 2);
try (Producer<V3Data> p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create()) {
- p.send(new V3Data("test2", 1, 2));
+ p.send(v3Data);
}
- log.info("try with fully compat, should succeed");
- try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
- p.send(new V4Data("test2", 1, (short)100));
+ Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+ try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+ .topic(topicName)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()
+ ) {
+ log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+ Message<GenericRecord> msg1 = consumer.receive();
+ v1Data.assertEqualToRecord(msg1.getValue());
+
+ Message<GenericRecord> msg2 = consumer.receive();
+ v3Data.assertEqualToRecord(msg2.getValue());
}
}
@@ -142,8 +200,10 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build()) {
+
+ V1Data v1Data = new V1Data("test1", 1);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
- p.send(new V1Data("test1", 1));
+ p.send(v1Data);
}
log.info("try with backward compat only, should fail");
@@ -161,8 +221,25 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
}
log.info("try with fully compat");
+ V4Data v4Data = new V4Data("test2", 1, (short) 100);
try (Producer<V4Data> p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create()) {
- p.send(new V4Data("test2", 1, (short)100));
+ p.send(v4Data);
+ }
+
+ Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+ try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(schema)
+ .topic(topicName)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()
+ ) {
+ log.info("Schema Info : {}", schema.getSchemaInfo().getSchemaDefinition());
+
+ Message<GenericRecord> msg1 = consumer.receive();
+ v1Data.assertEqualToRecord(msg1.getValue());
+
+ Message<GenericRecord> msg2 = consumer.receive();
+ v4Data.assertEqualToRecord(msg2.getValue());
}
}
}
@@ -204,7 +281,7 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
ObjectMapper mapper = new ObjectMapper();
Map<String, String> schema = new HashMap<>();
schema.put("type", "AVRO");
- schema.put("schema", new String(Schema.AVRO(V4Data.class).getSchemaInfo().getSchema(), UTF_8));
+ schema.put("schema", Schema.AVRO(V4Data.class).getSchemaInfo().getSchemaDefinition());
BrokerContainer b = pulsarCluster.getAnyBroker();
String schemaFile = String.format("/tmp/schema-%s", UUID.randomUUID().toString());
b.putFile(schemaFile, mapper.writeValueAsBytes(schema));
@@ -236,6 +313,14 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
this.foo = foo;
this.bar = bar;
}
+
+ void assertEqualToRecord(GenericRecord record) {
+ assertEquals(
+ 2, record.getFields().size(),
+ record.getFields().size() + " fields in found : " + record.getFields());
+ assertEquals(foo, record.getField("foo"));
+ assertEquals(Integer.valueOf(bar), record.getField("bar"));
+ }
}
// backward compatible with V1Data
@@ -246,6 +331,13 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
V2Data(String foo) {
this.foo = foo;
}
+
+ void assertEqualToRecord(GenericRecord record) {
+ assertEquals(
+ 1, record.getFields().size(),
+ record.getFields().size() + " fields in found : " + record.getFields());
+ assertEquals(foo, record.getField("foo"));
+ }
}
// forward compatible with V1Data
@@ -260,6 +352,15 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
this.bar = bar;
this.baz = baz;
}
+
+ void assertEqualToRecord(GenericRecord record) {
+ assertEquals(
+ 3, record.getFields().size(),
+ record.getFields().size() + " fields in found : " + record.getFields());
+ assertEquals(foo, record.getField("foo"));
+ assertEquals(Integer.valueOf(bar), record.getField("bar"));
+ assertEquals(Long.valueOf(baz), record.getField("baz"));
+ }
}
// fully compatible with V1Data
@@ -275,6 +376,17 @@ public class SchemaUpdateStrategyTest extends PulsarTestSuite {
this.bar = bar;
this.blah = blah;
}
+
+ void assertEqualToRecord(GenericRecord record) {
+ assertEquals(
+ 3, record.getFields().size(),
+ record.getFields().size() + " fields in found : " + record.getFields());
+ assertEquals(foo, record.getField("foo"));
+ assertEquals(Integer.valueOf(bar), record.getField("bar"));
+ // NOTE: in generic record, avro returns integer. we can consider improving the
+ // the behavior in future to reflect the right java class.
+ assertEquals(Integer.valueOf(blah), record.getField("blah"));
+ }
}
@Test