You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/25 05:36:19 UTC

[pulsar] branch master updated: [pulsar-clients]Save name, type, properties for KeyValueSchema (#4108)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 824a654  [pulsar-clients]Save name, type, properties for KeyValueSchema (#4108)
824a654 is described below

commit 824a6541d1d5644e57686d14a31745f3e3df3e30
Author: tuteng <eg...@gmail.com>
AuthorDate: Thu Apr 25 13:36:14 2019 +0800

    [pulsar-clients]Save name, type, properties for KeyValueSchema (#4108)
    
    
    ### Motivation
    
    The current keySchemaInfo and valueSchenamaInfo of KeyValueSchemahas no saved name, type and properties.
    
    ### Modifications
    
    Save name, type, properties for keySchemaInfo and valueSchemaInfo of KeyValueSchema
    
    ### Verifying this change
    
    Test pass
---
 .../pulsar/client/impl/schema/KeyValueSchema.java  | 19 ++++++++++++--
 .../client/impl/schema/KeyValueSchemaTest.java     | 29 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index a268aea..9a913d1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 import lombok.Getter;
 
 import org.apache.pulsar.client.api.Schema;
@@ -29,6 +32,7 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
+
 /**
  * [Key, Value] pair schema definition
  */
@@ -82,8 +86,19 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
 
         ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 4 + valueSchemaInfo.length);
         byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
-            .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
-        this.schemaInfo.setSchema(byteBuffer.array());
+                .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
+        properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
+        Gson keySchemaGson = new Gson();
+        properties.put("key.schema.properties", keySchemaGson.toJson(keySchema.getSchemaInfo().getProperties()));
+        properties.put("value.schema.name", valueSchema.getSchemaInfo().getName());
+        properties.put("value.schema.type", String.valueOf(valueSchema.getSchemaInfo().getType()));
+        Gson valueSchemaGson = new Gson();
+        properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));
+
+        this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
     }
 
     // encode as bytes: [key.length][key.bytes][value.length][value.bytes]
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index a32e31d..13fe029 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema;
 
 import static org.testng.Assert.assertEquals;
 
+import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -31,6 +32,8 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.Map;
+
 @Slf4j
 public class KeyValueSchemaTest {
 
@@ -60,6 +63,32 @@ public class KeyValueSchemaTest {
     }
 
     @Test
+    public void testFillParametersToSchemainfo() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+        fooSchema.getSchemaInfo().setName("foo");
+        fooSchema.getSchemaInfo().setType(SchemaType.AVRO);
+        Map<String, String> keyProperties = Maps.newTreeMap();
+        keyProperties.put("foo.key1", "value");
+        keyProperties.put("foo.key2", "value");
+        fooSchema.getSchemaInfo().setProperties(keyProperties);
+        barSchema.getSchemaInfo().setName("bar");
+        barSchema.getSchemaInfo().setType(SchemaType.AVRO);
+        Map<String, String> valueProperties = Maps.newTreeMap();
+        valueProperties.put("bar.key", "key");
+        barSchema.getSchemaInfo().setProperties(valueProperties);
+        Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
+
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO));
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO));
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}");
+    }
+
+    @Test
     public void testNotAllowNullAvroSchemaCreate() {
         AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
         AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());