You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/25 05:36:19 UTC
[pulsar] branch master updated: [pulsar-clients]Save name, type,
properties for KeyValueSchema (#4108)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 824a654 [pulsar-clients]Save name, type, properties for KeyValueSchema (#4108)
824a654 is described below
commit 824a6541d1d5644e57686d14a31745f3e3df3e30
Author: tuteng <eg...@gmail.com>
AuthorDate: Thu Apr 25 13:36:14 2019 +0800
[pulsar-clients]Save name, type, properties for KeyValueSchema (#4108)
### Motivation
The current keySchemaInfo and valueSchenamaInfo of KeyValueSchemahas no saved name, type and properties.
### Modifications
Save name, type, properties for keySchemaInfo and valueSchemaInfo of KeyValueSchema
### Verifying this change
Test pass
---
.../pulsar/client/impl/schema/KeyValueSchema.java | 19 ++++++++++++--
.../client/impl/schema/KeyValueSchemaTest.java | 29 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index a268aea..9a913d1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkArgument;
import java.nio.ByteBuffer;
+import java.util.Map;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
@@ -29,6 +32,7 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+
/**
* [Key, Value] pair schema definition
*/
@@ -82,8 +86,19 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 4 + valueSchemaInfo.length);
byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
- .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
- this.schemaInfo.setSchema(byteBuffer.array());
+ .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
+ properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
+ Gson keySchemaGson = new Gson();
+ properties.put("key.schema.properties", keySchemaGson.toJson(keySchema.getSchemaInfo().getProperties()));
+ properties.put("value.schema.name", valueSchema.getSchemaInfo().getName());
+ properties.put("value.schema.type", String.valueOf(valueSchema.getSchemaInfo().getType()));
+ Gson valueSchemaGson = new Gson();
+ properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));
+
+ this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
}
// encode as bytes: [key.length][key.bytes][value.length][value.bytes]
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index a32e31d..13fe029 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema;
import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -31,6 +32,8 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Map;
+
@Slf4j
public class KeyValueSchemaTest {
@@ -60,6 +63,32 @@ public class KeyValueSchemaTest {
}
@Test
+ public void testFillParametersToSchemainfo() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+ fooSchema.getSchemaInfo().setName("foo");
+ fooSchema.getSchemaInfo().setType(SchemaType.AVRO);
+ Map<String, String> keyProperties = Maps.newTreeMap();
+ keyProperties.put("foo.key1", "value");
+ keyProperties.put("foo.key2", "value");
+ fooSchema.getSchemaInfo().setProperties(keyProperties);
+ barSchema.getSchemaInfo().setName("bar");
+ barSchema.getSchemaInfo().setType(SchemaType.AVRO);
+ Map<String, String> valueProperties = Maps.newTreeMap();
+ valueProperties.put("bar.key", "key");
+ barSchema.getSchemaInfo().setProperties(valueProperties);
+ Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
+
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo");
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO));
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar");
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO));
+ assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}");
+ }
+
+ @Test
public void testNotAllowNullAvroSchemaCreate() {
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());