You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/20 02:57:14 UTC
[pulsar] branch master updated: [issue#4155][pulsar-clients]Support
key value schema versioning (#4211)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 58986a8 [issue#4155][pulsar-clients]Support key value schema versioning (#4211)
58986a8 is described below
commit 58986a854db545a2042b98d2a7fd59ab93bc7d53
Author: tuteng <eg...@gmail.com>
AuthorDate: Mon May 20 10:57:08 2019 +0800
[issue#4155][pulsar-clients]Support key value schema versioning (#4211)
Fixes #4155
Master Issue: #4155
### Motivation
Similar to AVRO and JSON schema, we need to support schema versioning for key/value schema.
### Modifications
Support key value schema versioning.
### Verifying this change
unit test pass
---
.../org/apache/pulsar/client/impl/MessageImpl.java | 42 +++-
.../pulsar/client/impl/schema/KeyValueSchema.java | 125 +++++++++-
.../apache/pulsar/client/impl/MessageImplTest.java | 266 ++++++++++++++++++++-
.../client/impl/schema/KeyValueSchemaTest.java | 10 +-
.../SupportVersioningKeyValueSchemaTest.java | 166 +++++++++++++
5 files changed, 581 insertions(+), 28 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index d11d177..372fb23 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -242,18 +242,42 @@ public class MessageImpl<T> implements Message<T> {
@Override
public T getValue() {
- // check if the schema passed in from client supports schema versioning or not
- // this is an optimization to only get schema version when necessary
- byte [] schemaVersion = getSchemaVersion();
- if (schema.supportSchemaVersioning() && schemaVersion != null) {
- return schema.decode(getData(), schemaVersion);
- } else if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
- KeyValueSchema kvSchema = (KeyValueSchema) schema;
- if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
- return schema.decode(getKeyBytes(), getData());
+ if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
+ if (schema.supportSchemaVersioning()) {
+ return getKeyValueBySchemaVersion();
+ } else {
+ return getKeyValue();
+ }
+ } else {
+ // check if the schema passed in from client supports schema versioning or not
+ // this is an optimization to only get schema version when necessary
+ if (schema.supportSchemaVersioning()) {
+ byte[] schemaVersion = getSchemaVersion();
+ if (null == schemaVersion) {
+ return schema.decode(getData());
+ } else {
+ return schema.decode(getData(), schemaVersion);
+ }
} else {
return schema.decode(getData());
}
+ }
+ }
+
+ private T getKeyValueBySchemaVersion() {
+ KeyValueSchema kvSchema = (KeyValueSchema) schema;
+ byte[] schemaVersion = getSchemaVersion();
+ if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+ return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+ } else {
+ return schema.decode(getData(), schemaVersion);
+ }
+ }
+
+ private T getKeyValue() {
+ KeyValueSchema kvSchema = (KeyValueSchema) schema;
+ if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+ return (T) kvSchema.decode(getKeyBytes(), getData(), null);
} else {
return schema.decode(getData());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index d91140b..066d3b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
import com.google.common.collect.Maps;
@@ -30,12 +31,12 @@ import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-
/**
* [Key, Value] pair schema definition
*/
@@ -53,6 +54,8 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
@Getter
private final KeyValueEncodingType keyValueEncodingType;
+ protected SchemaInfoProvider schemaInfoProvider;
+
/**
* Key Value Schema using passed in schema type, support JSON and AVRO currently.
*/
@@ -85,6 +88,10 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
return KV_BYTES;
}
+ @Override
+ public boolean supportSchemaVersioning() {
+ return keySchema.supportSchemaVersioning() || valueSchema.supportSchemaVersioning();
+ }
private KeyValueSchema(Schema<K> keySchema,
Schema<V> valueSchema) {
@@ -97,6 +104,46 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
+ if (keySchema instanceof StructSchema) {
+ ((StructSchema) keySchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+ @Override
+ public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+ SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ return decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
+ }
+
+ @Override
+ public SchemaInfo getLatestSchema() {
+ return ((StructSchema<K>) keySchema).schemaInfo;
+ }
+
+ @Override
+ public String getTopicName() {
+ return "key-schema";
+ }
+ });
+ }
+
+ if (valueSchema instanceof StructSchema) {
+ ((StructSchema) valueSchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+ @Override
+ public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+ SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ return decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
+ }
+
+ @Override
+ public SchemaInfo getLatestSchema() {
+ return ((StructSchema<V>) valueSchema).schemaInfo;
+ }
+
+ @Override
+ public String getTopicName() {
+ return "value-schema";
+ }
+ });
+ }
+
// set schemaInfo
this.schemaInfo = new SchemaInfo()
.setName("KeyValue")
@@ -125,6 +172,23 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
properties.put("kv.encoding.type", String.valueOf(keyValueEncodingType));
this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
+
+ this.schemaInfoProvider = new SchemaInfoProvider() {
+ @Override
+ public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+ return schemaInfo;
+ }
+
+ @Override
+ public SchemaInfo getLatestSchema() {
+ return schemaInfo;
+ }
+
+ @Override
+ public String getTopicName() {
+ return "key-value-schema";
+ }
+ };
}
// encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
@@ -142,6 +206,10 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
}
public KeyValue<K, V> decode(byte[] bytes) {
+ return decode(bytes, null);
+ }
+
+ public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) {
if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
}
@@ -154,14 +222,63 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
byte[] valueBytes = new byte[valueLength];
byteBuffer.get(valueBytes);
- return decode(keyBytes, valueBytes);
+ return decode(keyBytes, valueBytes, schemaVersion);
}
- public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes) {
- return new KeyValue<>(keySchema.decode(keyBytes), valueSchema.decode(valueBytes));
+ public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
+ K k;
+ if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
+ k = keySchema.decode(keyBytes, schemaVersion);
+ } else {
+ k = keySchema.decode(keyBytes);
+ }
+ V v;
+ if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
+ v = valueSchema.decode(valueBytes, schemaVersion);
+ } else {
+ v = valueSchema.decode(valueBytes);
+ }
+ return new KeyValue<>(k, v);
}
public SchemaInfo getSchemaInfo() {
return this.schemaInfo;
}
+
+ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ this.schemaInfoProvider = schemaInfoProvider;
+ }
+
+ private static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(schemaInfo.getSchema());
+ int keySchemaLength = byteBuffer.getInt();
+ byte[] key = new byte[keySchemaLength];
+ byteBuffer.get(key);
+ int valueSchemaLength = byteBuffer.getInt();
+ byte[] value = new byte[valueSchemaLength];
+ byteBuffer.get(value);
+ Gson keySchemaGson = new Gson();
+ Map<String, String> keyProperties = Maps.newHashMap();
+ if (schemaInfo.getProperties().get("key.schema.properties") != null) {
+ keyProperties = keySchemaGson.fromJson(schemaInfo.getProperties().get("key.schema.properties"), Map.class);
+ } else {
+ keyProperties = Collections.emptyMap();
+ }
+ SchemaInfo keySchemaInfo = SchemaInfo.builder().schema(key)
+ .properties(keyProperties)
+ .name("")
+ .type(SchemaType.AVRO).build();
+ Gson valueSchemaGson = new Gson();
+ Map<String, String> valueProperties = Maps.newHashMap();
+ if (schemaInfo.getProperties().get("value.schema.properties") != null) {
+ valueProperties = valueSchemaGson.fromJson(schemaInfo.getProperties().get("value.schema.properties"), Map.class);
+ } else {
+ valueProperties = Collections.emptyMap();
+ }
+ SchemaInfo valueSchemaInfo = SchemaInfo.builder().schema(value)
+ .properties(valueProperties)
+ .name("")
+ .type(SchemaType.AVRO).build();
+ return new KeyValue<>(keySchemaInfo, valueSchemaInfo);
+ }
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 13a08b9..5475050 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -24,11 +24,19 @@ import java.util.Base64;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.testng.Assert;
import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
@@ -81,8 +89,10 @@ public class MessageImplTest {
@Test
public void testDefaultGetProducerDataAssigned() {
- AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
- AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
@@ -96,7 +106,8 @@ public class MessageImplTest {
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata.Builder builder = MessageMetadata.newBuilder()
.setProducerName("default");
- MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
@@ -106,10 +117,13 @@ public class MessageImplTest {
@Test
public void testInlineGetProducerDataAssigned() {
- AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
- AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
- Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+ fooSchema, barSchema, KeyValueEncodingType.INLINE);
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
@@ -121,7 +135,8 @@ public class MessageImplTest {
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
MessageMetadata.Builder builder = MessageMetadata.newBuilder()
.setProducerName("inline");
- MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
@@ -130,10 +145,13 @@ public class MessageImplTest {
@Test
public void testSeparatedGetProducerDataAssigned() {
- AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
- AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
- Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+ fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
foo.setField2("field2");
@@ -147,10 +165,236 @@ public class MessageImplTest {
.setProducerName("separated");
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
builder.setPartitionKeyB64Encoded(true);
- MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+ assertEquals(keyValue.getKey(), foo);
+ assertEquals(keyValue.getValue(), bar);
+ assertTrue(builder.hasPartitionKey());
+ }
+
+ @Test
+ public void testDefaultAVROVersionGetProducerDataAssigned() {
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+ .setProducerName("default");
+ ByteString byteString = ByteString.copyFrom(new byte[10]);
+ builder.setSchemaVersion(byteString);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+ assertEquals(keyValue.getKey(), foo);
+ assertEquals(keyValue.getValue(), bar);
+ assertFalse(builder.hasPartitionKey());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testSeparatedAVROVersionGetProducerDataAssigned() {
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+ fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+ .setProducerName("separated");
+ ByteString byteString = ByteString.copyFrom(new byte[10]);
+ builder.setSchemaVersion(byteString);
+ builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+ builder.setPartitionKeyB64Encoded(true);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+ assertEquals(keyValue.getKey(), foo);
+ assertEquals(keyValue.getValue(), bar);
+ assertTrue(builder.hasPartitionKey());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.SEPARATED);
+ }
+
+ @Test
+ public void testDefaultJSONVersionGetProducerDataAssigned() {
+ JSONSchema<SchemaTestUtils.Foo> fooSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(
+ SchemaTestUtils.Foo.class).build());
+ JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(
+ SchemaTestUtils.Bar.class).build());
+
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+ .setProducerName("default");
+ ByteString byteString = ByteString.copyFrom(new byte[10]);
+ builder.setSchemaVersion(byteString);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+ assertEquals(keyValue.getKey(), foo);
+ assertEquals(keyValue.getValue(), bar);
+ assertFalse(builder.hasPartitionKey());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testSeparatedJSONVersionGetProducerDataAssigned() {
+ JSONSchema<SchemaTestUtils.Foo> fooSchema = JSONSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+ fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+ .setProducerName("separated");
+ ByteString byteString = ByteString.copyFrom(new byte[10]);
+ builder.setSchemaVersion(byteString);
+ builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+ builder.setPartitionKeyB64Encoded(true);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+ assertEquals(keyValue.getKey(), foo);
+ assertEquals(keyValue.getValue(), bar);
+ assertTrue(builder.hasPartitionKey());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.SEPARATED);
+ }
+
+ @Test
+ public void testDefaultAVROJSONVersionGetProducerDataAssigned() {
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(
+ SchemaTestUtils.Foo.class).build());
+ JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(
+ SchemaTestUtils.Bar.class).build());
+
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+ .setProducerName("default");
+ ByteString byteString = ByteString.copyFrom(new byte[10]);
+ builder.setSchemaVersion(byteString);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+ assertEquals(keyValue.getKey(), foo);
+ assertEquals(keyValue.getValue(), bar);
+ assertFalse(builder.hasPartitionKey());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testSeparatedAVROJSONVersionGetProducerDataAssigned() {
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+ fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+ .setProducerName("separated");
+ ByteString byteString = ByteString.copyFrom(new byte[10]);
+ builder.setSchemaVersion(byteString);
+ builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+ builder.setPartitionKeyB64Encoded(true);
+ MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+ builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
assertEquals(keyValue.getKey(), foo);
assertEquals(keyValue.getValue(), bar);
assertTrue(builder.hasPartitionKey());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.SEPARATED);
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 821ac24..9c903fa 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -36,6 +36,7 @@ import org.testng.annotations.Test;
import java.util.Map;
+
@Slf4j
public class KeyValueSchemaTest {
@@ -151,8 +152,10 @@ public class KeyValueSchemaTest {
@Test
public void testNotAllowNullJsonSchemaCreate() {
- JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
- JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
+ JSONSchema<Foo> fooSchema = JSONSchema.of(
+ SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(
+ SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
@@ -302,7 +305,6 @@ public class KeyValueSchemaTest {
Schema<KeyValue<Foo, Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
-
Bar bar = new Bar();
bar.setField1(true);
@@ -322,7 +324,7 @@ public class KeyValueSchemaTest {
} catch (SchemaSerializationException e) {
Assert.assertTrue(e.getMessage().contains("This method cannot be used under this SEPARATED encoding type"));
}
- KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(fooSchema.encode(foo), encodeBytes);
+ KeyValue<Foo, Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(fooSchema.encode(foo), encodeBytes, null);
Foo fooBack = keyValue.getKey();
Bar barBack = keyValue.getValue();
assertEquals(foo, fooBack);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
new file mode 100644
index 0000000..375baf5
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class SupportVersioningKeyValueSchemaTest {
+
+ @Test
+ public void testKeyValueVersioningEncodeDecode() {
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+ fooSchema, barSchema);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(SchemaTestUtils.Color.RED);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = keyValueSchema.decode(
+ encodeBytes, new byte[10]);
+ Assert.assertEquals(keyValue.getKey().getField1(), foo.getField1());
+ Assert.assertEquals(keyValue.getKey().getField2(), foo.getField2());
+ Assert.assertEquals(keyValue.getKey().getField3(), foo.getField3());
+ Assert.assertEquals(keyValue.getKey().getField4(), foo.getField4());
+ Assert.assertEquals(keyValue.getKey().getColor(), foo.getColor());
+ Assert.assertTrue(keyValue.getValue().isField1());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testSeparateKeyValueVersioningEncodeDecode() {
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+ fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+ keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(keyValueSchema.getSchemaInfo());
+
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(SchemaTestUtils.Color.RED);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(
+ fooSchema.encode(foo), encodeBytes, new byte[10]);
+ Assert.assertTrue(keyValue.getValue().isField1());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.SEPARATED);
+ }
+
+ @Test
+ public void testKeyValueDefaultVersioningEncodeDecode() {
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+ fooSchema, barSchema);
+
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(SchemaTestUtils.Color.RED);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = keyValueSchema.decode(
+ encodeBytes, new byte[10]);
+ Assert.assertEquals(keyValue.getKey().getField1(), foo.getField1());
+ Assert.assertEquals(keyValue.getKey().getField2(), foo.getField2());
+ Assert.assertEquals(keyValue.getKey().getField3(), foo.getField3());
+ Assert.assertEquals(keyValue.getKey().getField4(), foo.getField4());
+ Assert.assertEquals(keyValue.getKey().getColor(), foo.getColor());
+ Assert.assertTrue(keyValue.getValue().isField1());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.INLINE);
+ }
+
+ @Test
+ public void testKeyValueLatestVersioningEncodeDecode() {
+ AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+ AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+ SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+ Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+ fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+
+ SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+ bar.setField1(true);
+
+ SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(SchemaTestUtils.Color.RED);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(
+ fooSchema.encode(foo), encodeBytes, new byte[10]);
+ Assert.assertTrue(keyValue.getValue().isField1());
+ Assert.assertEquals(
+ KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+ KeyValueEncodingType.SEPARATED);
+ }
+}