You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/20 05:30:46 UTC

[pulsar] branch master updated: [schema] key/value schema enhancement (#4548)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 82d9e71  [schema] key/value schema enhancement (#4548)
82d9e71 is described below

commit 82d9e7160734f0e3b666085b00946e655d4eccf8
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Jun 19 22:30:40 2019 -0700

    [schema] key/value schema enhancement (#4548)
    
    *Motivation*
    
    - The code for encoding and decoding key/value schema is spreading over multiple places.
    - Make code changes to prepare supporting key/value schema in AUTO consumers
    - Make schema tools display key/value schema in a pretty format
    
    *Modifications*
    
    - Move the common logic of encoding and decoding key/value schema to a common class KeyValueSchemaInfo
    - Expose the common class in DefaultImplementation so that it can be available for public usage
    - Fix the display problem on displaying key/value schema
    
    *Verify this change*
    
    - Add bunch of the unit tests for key/value schemas
---
 .../schema/KeyValueSchemaCompatibilityCheck.java   |  57 +----
 .../KeyValueSchemaCompatibilityCheckTest.java      |   8 -
 .../pulsar/client/api/schema/SchemaWriter.java     |   4 +
 .../client/internal/DefaultImplementation.java     |  91 ++++++-
 .../pulsar/client/internal/ReflectionUtils.java    |  10 +
 .../org/apache/pulsar/common/schema/KeyValue.java  |  85 +++++++
 .../apache/pulsar/common/schema/SchemaInfo.java    |  11 +
 .../org/apache/pulsar/admin/cli/CmdSchemas.java    |  24 +-
 .../pulsar/client/impl/schema/KeyValueSchema.java  | 102 ++------
 .../client/impl/schema/KeyValueSchemaInfo.java     | 221 ++++++++++++++++
 .../pulsar/client/impl/schema/SchemaUtils.java     | 157 ++++++++++++
 .../client/impl/schema/KeyValueSchemaInfoTest.java | 210 ++++++++++++++++
 .../pulsar/client/impl/schema/SchemaInfoTest.java  | 279 +++++++++++++++++++++
 .../apache/pulsar/common/schema/KeyValueTest.java  | 122 +++++++++
 .../pulsar/common/protocol/schema/SchemaData.java  |  30 +++
 .../pulsar/tests/integration/cli/CLITest.java      |   2 +-
 16 files changed, 1247 insertions(+), 166 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
index b7cf5b4..2426f3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
-import com.google.gson.Gson;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map;
@@ -39,48 +39,13 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
         this.checkers = checkers;
     }
 
-    private KeyValue<byte[], byte[]> splitKeyValueSchema(byte[] bytes) {
-        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-        int keyLength = byteBuffer.getInt();
-        byte[] keySchema = new byte[keyLength];
-        byteBuffer.get(keySchema);
-
-        int valueLength = byteBuffer.getInt();
-        byte[] valueSchema = new byte[valueLength];
-        byteBuffer.get(valueSchema);
-        return new KeyValue<>(keySchema, valueSchema);
-    }
-
-    private SchemaType fetchSchemaType(Map<String, String> properties, String key) {
-        if (properties.get(key) != null) {
-            return SchemaType.valueOf(properties.get(key));
-        }
-        return SchemaType.BYTES;
-    }
-
-    private SchemaData fetchSchemaData(
-            byte[] keyValue, SchemaType schemaType, Gson schemaGson, Map<String, String> properties, String key) {
-        if (properties.get(key) != null) {
-            return SchemaData.builder().data(keyValue)
-                    .type(schemaType)
-                    .props(schemaGson.fromJson(properties.get(key), Map.class)).build();
-        }
-        return SchemaData.builder().data(keyValue)
-                .type(schemaType)
-                .props(Collections.emptyMap()).build();
-    }
-
-    private KeyValue<SchemaData, SchemaData> splitKeyValueSchemaData(SchemaData schemaData) {
-        KeyValue<byte[], byte[]> keyValue = this.splitKeyValueSchema(schemaData.getData());
-        Map<String, String> properties = schemaData.getProps();
-        SchemaType keyType = fetchSchemaType(properties, "key.schema.type");
-        SchemaType valueType = fetchSchemaType(properties, "value.schema.type");
-        Gson schemaGson = new Gson();
-        SchemaData keySchemaData = fetchSchemaData(
-                keyValue.getKey(), keyType, schemaGson, properties, "key.schema.properties");
-        SchemaData valueSchemaData = fetchSchemaData(
-                keyValue.getValue(), valueType, schemaGson, properties, "value.schema.properties");
-        return new KeyValue<>(keySchemaData, valueSchemaData);
+    private KeyValue<SchemaData, SchemaData> decodeKeyValueSchemaData(SchemaData schemaData) {
+        KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+            KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaData.toSchemaInfo());
+        return new KeyValue<>(
+            SchemaData.fromSchemaInfo(schemaInfoKeyValue.getKey()),
+            SchemaData.fromSchemaInfo(schemaInfoKeyValue.getValue())
+        );
     }
 
     @Override
@@ -104,7 +69,7 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
         LinkedList<SchemaData> fromKeyList = new LinkedList<>();
         LinkedList<SchemaData> fromValueList = new LinkedList<>();
         KeyValue<SchemaData, SchemaData> fromKeyValue;
-        KeyValue<SchemaData, SchemaData> toKeyValue = splitKeyValueSchemaData(to);
+        KeyValue<SchemaData, SchemaData> toKeyValue = decodeKeyValueSchemaData(to);
         SchemaType toKeyType = toKeyValue.getKey().getType();
         SchemaType toValueType = toKeyValue.getValue().getType();
 
@@ -112,7 +77,7 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
             if (schemaData.getType() != SchemaType.KEY_VALUE) {
                 return false;
             }
-            fromKeyValue = splitKeyValueSchemaData(schemaData);
+            fromKeyValue = decodeKeyValueSchemaData(schemaData);
             if (fromKeyValue.getKey().getType() != toKeyType || fromKeyValue.getValue().getType() != toValueType) {
                 return false;
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
index b3d4716..0755781 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
@@ -441,13 +441,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         Map<String, String> fromProperties = Maps.newHashMap();
         fromProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
-        fromProperties.put("key.schema.properties", null);
-        fromProperties.put("value.schema.properties", null);
         Map<String, String> toProperties = Maps.newHashMap();
         toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
-        toProperties.put("key.schema.properties", null);
-        toProperties.put("value.schema.properties", null);
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
                 .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
@@ -460,11 +456,7 @@ public class KeyValueSchemaCompatibilityCheckTest {
         AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
         AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
         Map<String, String> fromProperties = Maps.newHashMap();
-        fromProperties.put("key.schema.type", null);
-        fromProperties.put("value.schema.type", null);
         Map<String, String> toProperties = Maps.newHashMap();
-        toProperties.put("key.schema.type", null);
-        toProperties.put("value.schema.type", null);
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
                 .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
index a93739a..65576d3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.client.api.schema;
 
+/**
+ * Serialize messages into bytes.
+ */
+@FunctionalInterface
 public interface SchemaWriter<T> {
 
     /**
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 80282bf..c8418b9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -41,7 +41,11 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -284,6 +288,91 @@ public class DefaultImplementation {
                         String.class).newInstance(name));
     }
 
+    /**
+     * Decode the kv encoding type from the schema info.
+     *
+     * @param schemaInfo the schema info
+     * @return the kv encoding type
+     */
+    public static KeyValueEncodingType decodeKeyValueEncodingType(SchemaInfo schemaInfo) {
+        return catchExceptions(
+            () -> (KeyValueEncodingType) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo",
+                "decodeKeyValueEncodingType", SchemaInfo.class
+            ).invoke(null, schemaInfo));
+    }
+
+    /**
+     * Encode key & value into schema into a KeyValue schema.
+     *
+     * @param keySchema the key schema
+     * @param valueSchema the value schema
+     * @param keyValueEncodingType the encoding type to encode and decode key value pair
+     * @return the final schema info
+     */
+    public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(Schema<K> keySchema,
+                                                             Schema<V> valueSchema,
+                                                             KeyValueEncodingType keyValueEncodingType) {
+        return encodeKeyValueSchemaInfo("KeyValue", keySchema, valueSchema, keyValueEncodingType);
+    }
+
+    /**
+     * Encode key & value into schema into a KeyValue schema.
+     *
+     * @param schemaName the final schema name
+     * @param keySchema the key schema
+     * @param valueSchema the value schema
+     * @param keyValueEncodingType the encoding type to encode and decode key value pair
+     * @return the final schema info
+     */
+    public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(String schemaName,
+                                                             Schema<K> keySchema,
+                                                             Schema<V> valueSchema,
+                                                             KeyValueEncodingType keyValueEncodingType) {
+        return catchExceptions(
+            () -> (SchemaInfo) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo",
+                "encodeKeyValueSchemaInfo", String.class, Schema.class, Schema.class, KeyValueEncodingType.class
+            ).invoke(null, schemaName, keySchema, valueSchema, keyValueEncodingType));
+    }
+
+    /**
+     * Decode the key/value schema info to get key schema info and value schema info.
+     *
+     * @param schemaInfo key/value schema info.
+     * @return the pair of key schema info and value schema info
+     */
+    public static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
+        return catchExceptions(
+            () -> (KeyValue<SchemaInfo, SchemaInfo>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo",
+                "decodeKeyValueSchemaInfo", SchemaInfo.class
+            ).invoke(null, schemaInfo));
+    }
+
+    /**
+     * Jsonify the schema info.
+     *
+     * @param schemaInfo the schema info
+     * @return the jsonified schema info
+     */
+    public static String jsonifySchemaInfo(SchemaInfo schemaInfo) {
+        return catchExceptions(
+            () -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+                "jsonifySchemaInfo", SchemaInfo.class
+            ).invoke(null, schemaInfo));
+    }
+
+    /**
+     * Jsonify the key/value schema info.
+     *
+     * @param kvSchemaInfo the key/value schema info
+     * @return the jsonified schema info
+     */
+    public static String jsonifyKeyValueSchemaInfo(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) {
+        return catchExceptions(
+            () -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+                "jsonifyKeyValueSchemaInfo", KeyValue.class
+            ).invoke(null, kvSchemaInfo));
+    }
+
     public static BatcherBuilder newDefaultBatcherBuilder() {
         return catchExceptions(
             () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.DefaultBatcherBuilder")
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
index 2b4764c..4d9db6e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.internal;
 
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
 import lombok.experimental.UtilityClass;
@@ -33,6 +34,15 @@ class ReflectionUtils {
         try {
             return s.get();
         } catch (Throwable t) {
+            if (t instanceof InvocationTargetException) {
+                // exception is thrown during invocation
+                Throwable cause = t.getCause();
+                if (cause instanceof RuntimeException) {
+                    throw (RuntimeException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
             throw new RuntimeException(t);
         }
     }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
index f95fe7d..3f66ff3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.common.schema;
 
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.pulsar.client.api.Schema;
+
 /**
  * A simple KeyValue class
  */
@@ -37,4 +41,85 @@ public class KeyValue<K, V> {
     public V getValue() {
         return value;
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof KeyValue)) {
+            return false;
+        }
+        KeyValue<K, V> another = (KeyValue<K, V>) obj;
+        return Objects.equals(key, another.key)
+            && Objects.equals(value, another.value);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("(key = \"")
+          .append(key)
+          .append("\", value = \"")
+          .append(value)
+          .append("\")");
+        return sb.toString();
+    }
+
+    /**
+     * Decoder to decode key/value bytes.
+     */
+    @FunctionalInterface
+    public interface KeyValueDecoder<K, V> {
+
+        /**
+         * Decode key and value bytes into a {@link KeyValue} pair.
+         *
+         * @param keyData key data
+         * @param valueData value data
+         * @return the decoded {@link KeyValue} pair
+         */
+        KeyValue<K, V> decode(byte[] keyData, byte[] valueData);
+
+    }
+
+    /**
+     * Encode a <tt>key</tt> and <tt>value</tt> pair into a bytes array.
+     *
+     * @param key key object to encode
+     * @param keyWriter a writer to encode key object
+     * @param value value object to encode
+     * @param valueWriter a writer to encode value object
+     * @return the encoded bytes array
+     */
+    public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
+                                       V value, Schema<V> valueWriter) {
+        byte [] keyBytes = keyWriter.encode(key);
+        byte [] valueBytes = valueWriter.encode(value);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
+        byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+        return byteBuffer.array();
+    }
+
+    /**
+     * Decode the value into a key/value pair.
+     *
+     * @param data the encoded bytes
+     * @param decoder the decoder to decode encoded key/value bytes
+     * @return the decoded key/value pair
+     */
+    public static <K, V> KeyValue<K, V> decode(byte[] data, KeyValueDecoder<K, V> decoder) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+        int keyLength = byteBuffer.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        byteBuffer.get(keyBytes);
+
+        int valueLength = byteBuffer.getInt();
+        byte[] valueBytes = new byte[valueLength];
+        byteBuffer.get(valueBytes);
+
+        return decoder.decode(keyBytes, valueBytes);
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index c5d1b72..7b07632 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -30,6 +30,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 
 @Data
 @AllArgsConstructor
@@ -66,8 +67,18 @@ public class SchemaInfo {
             case JSON:
             case PROTOBUF:
                 return new String(schema, UTF_8);
+            case KEY_VALUE:
+                KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+                    DefaultImplementation.decodeKeyValueSchemaInfo(this);
+                return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
             default:
                 return Base64.getEncoder().encodeToString(schema);
         }
     }
+
+    @Override
+    public String toString(){
+        return DefaultImplementation.jsonifySchemaInfo(this);
+    }
+
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index 5d71035..9973df8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -22,16 +22,8 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
-import java.lang.reflect.Type;
 import java.net.URL;
 import java.net.URLClassLoader;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonPrimitive;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
 import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
@@ -66,21 +58,7 @@ public class CmdSchemas extends CmdBase {
             } else {
                 schemaInfo = admin.schemas().getSchemaInfo(topic, version);
             }
-            Gson customGson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class,
-                    new ByteArrayToStringAdapter(schemaInfo)).create();
-            System.out.println(customGson.toJson(schemaInfo));
-        }
-    }
-
-    // Using Android's base64 libraries. This can be replaced with any base64 library.
-    private class ByteArrayToStringAdapter implements JsonSerializer<byte[]> {
-        private SchemaInfo schemaInfo;
-        public ByteArrayToStringAdapter(SchemaInfo schemaInfo) {
-            this.schemaInfo = schemaInfo;
-        }
-
-        public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
-            return new JsonPrimitive(schemaInfo.getSchemaDefinition());
+            System.out.println(schemaInfo);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 066d3b3..7eb4757 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -19,16 +19,8 @@
 package org.apache.pulsar.client.impl.schema;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
 import lombok.Getter;
-
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
@@ -103,13 +95,17 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
                            KeyValueEncodingType keyValueEncodingType) {
         this.keySchema = keySchema;
         this.valueSchema = valueSchema;
+        this.keyValueEncodingType = keyValueEncodingType;
+        this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+            keySchema, valueSchema, keyValueEncodingType
+        );
 
         if (keySchema instanceof StructSchema) {
-            ((StructSchema) keySchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+            keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
                 @Override
                 public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
                     SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
-                    return decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
+                    return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
                 }
 
                 @Override
@@ -125,11 +121,11 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         }
 
         if (valueSchema instanceof StructSchema) {
-            ((StructSchema) valueSchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+            valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
                 @Override
                 public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
                     SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
-                    return decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
+                    return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
                 }
 
                 @Override
@@ -144,35 +140,6 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
             });
         }
 
-        // set schemaInfo
-        this.schemaInfo = new SchemaInfo()
-                .setName("KeyValue")
-                .setType(SchemaType.KEY_VALUE);
-
-        byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
-        byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();
-
-        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 4 + valueSchemaInfo.length);
-        byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
-                .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
-
-        Map<String, String> properties = Maps.newHashMap();
-
-        properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
-        properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
-        Gson keySchemaGson = new Gson();
-        properties.put("key.schema.properties", keySchemaGson.toJson(keySchema.getSchemaInfo().getProperties()));
-        properties.put("value.schema.name", valueSchema.getSchemaInfo().getName());
-        properties.put("value.schema.type", String.valueOf(valueSchema.getSchemaInfo().getType()));
-        Gson valueSchemaGson = new Gson();
-        properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));
-
-        checkNotNull(keyValueEncodingType, "Null encoding type is provided");
-        this.keyValueEncodingType = keyValueEncodingType;
-        properties.put("kv.encoding.type", String.valueOf(keyValueEncodingType));
-
-        this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
-
         this.schemaInfoProvider = new SchemaInfoProvider() {
             @Override
             public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
@@ -194,11 +161,12 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
     // encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
     public byte[] encode(KeyValue<K, V> message) {
         if (keyValueEncodingType != null && keyValueEncodingType == KeyValueEncodingType.INLINE) {
-            byte [] keyBytes = keySchema.encode(message.getKey());
-            byte [] valueBytes = valueSchema.encode(message.getValue());
-            ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
-            byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
-            return byteBuffer.array();
+            return KeyValue.encode(
+                message.getKey(),
+                keySchema,
+                message.getValue(),
+                valueSchema
+            );
         } else {
             return valueSchema.encode(message.getValue());
         }
@@ -213,16 +181,8 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
             throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
         }
-        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-        int keyLength = byteBuffer.getInt();
-        byte[] keyBytes = new byte[keyLength];
-        byteBuffer.get(keyBytes);
 
-        int valueLength = byteBuffer.getInt();
-        byte[] valueBytes = new byte[valueLength];
-        byteBuffer.get(valueBytes);
-
-        return decode(keyBytes, valueBytes, schemaVersion);
+        return KeyValue.decode(bytes, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion));
     }
 
     public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
@@ -249,36 +209,4 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         this.schemaInfoProvider = schemaInfoProvider;
     }
 
-    private static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
-        ByteBuffer byteBuffer = ByteBuffer.wrap(schemaInfo.getSchema());
-        int keySchemaLength = byteBuffer.getInt();
-        byte[] key = new byte[keySchemaLength];
-        byteBuffer.get(key);
-        int valueSchemaLength = byteBuffer.getInt();
-        byte[] value = new byte[valueSchemaLength];
-        byteBuffer.get(value);
-        Gson keySchemaGson = new Gson();
-        Map<String, String> keyProperties = Maps.newHashMap();
-        if (schemaInfo.getProperties().get("key.schema.properties") != null) {
-            keyProperties = keySchemaGson.fromJson(schemaInfo.getProperties().get("key.schema.properties"), Map.class);
-        } else {
-            keyProperties = Collections.emptyMap();
-        }
-        SchemaInfo keySchemaInfo = SchemaInfo.builder().schema(key)
-                .properties(keyProperties)
-                .name("")
-                .type(SchemaType.AVRO).build();
-        Gson valueSchemaGson = new Gson();
-        Map<String, String> valueProperties = Maps.newHashMap();
-        if (schemaInfo.getProperties().get("value.schema.properties") != null) {
-            valueProperties = valueSchemaGson.fromJson(schemaInfo.getProperties().get("value.schema.properties"), Map.class);
-        } else {
-            valueProperties = Collections.emptyMap();
-        }
-        SchemaInfo valueSchemaInfo = SchemaInfo.builder().schema(value)
-                .properties(valueProperties)
-                .name("")
-                .type(SchemaType.AVRO).build();
-        return new KeyValue<>(keySchemaInfo, valueSchemaInfo);
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
new file mode 100644
index 0000000..6d54d57
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * Util class for processing key/value schema info.
+ */
+@Slf4j
+public final class KeyValueSchemaInfo {
+
+    private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new Schema<SchemaInfo>() {
+        @Override
+        public byte[] encode(SchemaInfo si) {
+            return si.getSchema();
+        }
+
+        @Override
+        public SchemaInfo getSchemaInfo() {
+            return BytesSchema.BYTES.getSchemaInfo();
+        }
+    };
+
+    private static final String KEY_SCHEMA_NAME = "key.schema.name";
+    private static final String KEY_SCHEMA_TYPE = "key.schema.type";
+    private static final String KEY_SCHEMA_PROPS = "key.schema.properties";
+    private static final String VALUE_SCHEMA_NAME = "value.schema.name";
+    private static final String VALUE_SCHEMA_TYPE = "value.schema.type";
+    private static final String VALUE_SCHEMA_PROPS = "value.schema.properties";
+    private static final String KV_ENCODING_TYPE = "kv.encoding.type";
+
+    /**
+     * Decode the kv encoding type from the schema info.
+     *
+     * @param schemaInfo the schema info
+     * @return the kv encoding type
+     */
+    public static KeyValueEncodingType decodeKeyValueEncodingType(SchemaInfo schemaInfo) {
+        checkArgument(SchemaType.KEY_VALUE == schemaInfo.getType(),
+            "Not a KeyValue schema");
+
+        String encodingTypeStr = schemaInfo.getProperties().get(KV_ENCODING_TYPE);
+        if (StringUtils.isEmpty(encodingTypeStr)) {
+            return KeyValueEncodingType.INLINE;
+        } else {
+            return KeyValueEncodingType.valueOf(encodingTypeStr);
+        }
+    }
+
+    /**
+     * Encode key & value into schema into a KeyValue schema.
+     *
+     * @param keySchema the key schema
+     * @param valueSchema the value schema
+     * @param keyValueEncodingType the encoding type to encode and decode key value pair
+     * @return the final schema info
+     */
+    public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(Schema<K> keySchema,
+                                                             Schema<V> valueSchema,
+                                                             KeyValueEncodingType keyValueEncodingType) {
+        return encodeKeyValueSchemaInfo(
+            "KeyValue",
+            keySchema,
+            valueSchema,
+            keyValueEncodingType
+        );
+    }
+
+    /**
+     * Encode key & value into schema into a KeyValue schema.
+     *
+     * @param schemaName the final schema name
+     * @param keySchema the key schema
+     * @param valueSchema the value schema
+     * @param keyValueEncodingType the encoding type to encode and decode key value pair
+     * @return the final schema info
+     */
+    public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(String schemaName,
+                                                             Schema<K> keySchema,
+                                                             Schema<V> valueSchema,
+                                                             KeyValueEncodingType keyValueEncodingType) {
+        checkNotNull(keyValueEncodingType, "Null encoding type is provided");
+
+        // process key/value schema data
+        byte[] schemaData = KeyValue.encode(
+            keySchema.getSchemaInfo(),
+            SCHEMA_INFO_WRITER,
+            valueSchema.getSchemaInfo(),
+            SCHEMA_INFO_WRITER
+        );
+
+        // process key/value schema properties
+        Map<String, String> properties = new HashMap<>();
+        encodeSubSchemaInfoToParentSchemaProperties(
+            keySchema,
+            KEY_SCHEMA_NAME,
+            KEY_SCHEMA_TYPE,
+            KEY_SCHEMA_PROPS,
+            properties
+        );
+
+        encodeSubSchemaInfoToParentSchemaProperties(
+            valueSchema,
+            VALUE_SCHEMA_NAME,
+            VALUE_SCHEMA_TYPE,
+            VALUE_SCHEMA_PROPS,
+            properties
+        );
+        properties.put(KV_ENCODING_TYPE, String.valueOf(keyValueEncodingType));
+
+        // generate the final schema info
+        return new SchemaInfo()
+            .setName(schemaName)
+            .setType(SchemaType.KEY_VALUE)
+            .setSchema(schemaData)
+            .setProperties(properties);
+
+    }
+
+    private static void encodeSubSchemaInfoToParentSchemaProperties(Schema<?> schema,
+                                                                    String schemaNameProperty,
+                                                                    String schemaTypeProperty,
+                                                                    String schemaPropsProperty,
+                                                                    Map<String, String> parentSchemaProperties) {
+        SchemaInfo schemaInfo = schema.getSchemaInfo();
+
+        parentSchemaProperties.put(schemaNameProperty, schemaInfo.getName());
+        parentSchemaProperties.put(schemaTypeProperty, String.valueOf(schemaInfo.getType()));
+        parentSchemaProperties.put(
+            schemaPropsProperty,
+            SchemaUtils.serializeSchemaProperties(schemaInfo.getProperties()));
+    }
+
+    /**
+     * Decode the key/value schema info to get key schema info and value schema info.
+     *
+     * @param schemaInfo key/value schema info.
+     * @return the pair of key schema info and value schema info
+     */
+    public static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
+        checkArgument(SchemaType.KEY_VALUE == schemaInfo.getType(),
+            "Not a KeyValue schema");
+
+        return KeyValue.decode(
+            schemaInfo.getSchema(),
+            (keyBytes, valueBytes) -> {
+                SchemaInfo keySchemaInfo = decodeSubSchemaInfo(
+                    schemaInfo,
+                    KEY_SCHEMA_NAME,
+                    KEY_SCHEMA_TYPE,
+                    KEY_SCHEMA_PROPS,
+                    keyBytes
+                );
+
+                SchemaInfo valueSchemaInfo = decodeSubSchemaInfo(
+                    schemaInfo,
+                    VALUE_SCHEMA_NAME,
+                    VALUE_SCHEMA_TYPE,
+                    VALUE_SCHEMA_PROPS,
+                    valueBytes
+                );
+                return new KeyValue<>(keySchemaInfo, valueSchemaInfo);
+            }
+        );
+    }
+
+    private static SchemaInfo decodeSubSchemaInfo(SchemaInfo parentSchemaInfo,
+                                                  String schemaNameProperty,
+                                                  String schemaTypeProperty,
+                                                  String schemaPropsProperty,
+                                                  byte[] schemaData) {
+        Map<String, String> parentSchemaProps = parentSchemaInfo.getProperties();
+        String schemaName = parentSchemaProps.getOrDefault(schemaNameProperty, "");
+        SchemaType schemaType =
+            SchemaType.valueOf(parentSchemaProps.getOrDefault(schemaTypeProperty, SchemaType.BYTES.name()));
+        Map<String, String> schemaProps;
+        String schemaPropsStr = parentSchemaProps.get(schemaPropsProperty);
+        if (StringUtils.isEmpty(schemaPropsStr)) {
+            schemaProps = Collections.emptyMap();
+        } else {
+            schemaProps = SchemaUtils.deserializeSchemaProperties(schemaPropsStr);
+        }
+        return SchemaInfo.builder()
+            .name(schemaName)
+            .type(schemaType)
+            .schema(schemaData)
+            .properties(schemaProps)
+            .build();
+    }
+
+    private KeyValueSchemaInfo() {}
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 3ba36b1..83b23c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -18,14 +18,30 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
@@ -168,4 +184,145 @@ public final class SchemaUtils {
 
     }
 
+    /**
+     * Jsonify the schema info.
+     *
+     * @param schemaInfo the schema info
+     * @return the jsonified schema info
+     */
+    public static String jsonifySchemaInfo(SchemaInfo schemaInfo) {
+        GsonBuilder gsonBuilder = new GsonBuilder()
+            .setPrettyPrinting()
+            .registerTypeHierarchyAdapter(byte[].class, new ByteArrayToStringAdapter(schemaInfo))
+            .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+
+        return gsonBuilder.create().toJson(schemaInfo);
+    }
+
+    private static class SchemaPropertiesSerializer implements JsonSerializer<Map<String, String>> {
+
+        @Override
+        public JsonElement serialize(Map<String, String> properties,
+                                     Type type,
+                                     JsonSerializationContext jsonSerializationContext) {
+            SortedMap<String, String> sortedProperties = new TreeMap<>();
+            sortedProperties.putAll(properties);
+            JsonObject object = new JsonObject();
+            sortedProperties.forEach((key, value) -> {
+                object.add(key, new JsonPrimitive(value));
+            });
+            return object;
+        }
+
+    }
+
+    private static class SchemaPropertiesDeserializer implements JsonDeserializer<Map<String, String>> {
+
+        @Override
+        public Map<String, String> deserialize(JsonElement jsonElement,
+                                               Type type,
+                                               JsonDeserializationContext jsonDeserializationContext)
+            throws JsonParseException {
+
+            SortedMap<String, String> sortedProperties = new TreeMap<>();
+            jsonElement.getAsJsonObject().entrySet().forEach(entry -> sortedProperties.put(
+                entry.getKey(),
+                entry.getValue().getAsString()
+            ));
+            return sortedProperties;
+        }
+
+    }
+
+    private static final SchemaPropertiesSerializer SCHEMA_PROPERTIES_SERIALIZER =
+        new SchemaPropertiesSerializer();
+
+    private static final SchemaPropertiesDeserializer SCHEMA_PROPERTIES_DESERIALIZER =
+        new SchemaPropertiesDeserializer();
+
+    private static class ByteArrayToStringAdapter implements JsonSerializer<byte[]> {
+
+        private final SchemaInfo schemaInfo;
+
+        public ByteArrayToStringAdapter(SchemaInfo schemaInfo) {
+            this.schemaInfo = schemaInfo;
+        }
+
+        public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
+            String schemaDef = schemaInfo.getSchemaDefinition();
+            SchemaType type = schemaInfo.getType();
+            switch (type) {
+                case AVRO:
+                case JSON:
+                case PROTOBUF:
+                    return toJsonObject(schemaInfo.getSchemaDefinition());
+                case KEY_VALUE:
+                    KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+                        DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfo);
+                    JsonObject obj = new JsonObject();
+                    String keyJson = jsonifySchemaInfo(schemaInfoKeyValue.getKey());
+                    String valueJson = jsonifySchemaInfo(schemaInfoKeyValue.getValue());
+                    obj.add("key", toJsonObject(keyJson));
+                    obj.add("value", toJsonObject(valueJson));
+                    return obj;
+                default:
+                    return new JsonPrimitive(schemaDef);
+            }
+        }
+    }
+
+    private static JsonObject toJsonObject(String json) {
+        JsonParser parser = new JsonParser();
+        return parser.parse(json).getAsJsonObject();
+    }
+
+    private static class SchemaInfoToStringAdapter implements JsonSerializer<SchemaInfo> {
+
+        @Override
+        public JsonElement serialize(SchemaInfo schemaInfo,
+                                     Type type,
+                                     JsonSerializationContext jsonSerializationContext) {
+            return toJsonObject(jsonifySchemaInfo(schemaInfo));
+        }
+    }
+
+    private static final SchemaInfoToStringAdapter SCHEMAINFO_ADAPTER = new SchemaInfoToStringAdapter();
+
+    /**
+     * Jsonify the key/value schema info.
+     *
+     * @param kvSchemaInfo the key/value schema info
+     * @return the jsonified schema info
+     */
+    public static String jsonifyKeyValueSchemaInfo(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) {
+        GsonBuilder gsonBuilder = new GsonBuilder()
+            .registerTypeHierarchyAdapter(SchemaInfo.class, SCHEMAINFO_ADAPTER)
+            .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+        return gsonBuilder.create().toJson(kvSchemaInfo);
+    }
+
+    /**
+     * Serialize schema properties
+     *
+     * @param properties schema properties
+     * @return the serialized schema properties
+     */
+    public static String serializeSchemaProperties(Map<String, String> properties) {
+        GsonBuilder gsonBuilder = new GsonBuilder()
+            .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+        return gsonBuilder.create().toJson(properties);
+    }
+
+    /**
+     * Deserialize schema properties from a serialized schema properties.
+     *
+     * @param serializedProperties serialized properties
+     * @return the deserialized properties
+     */
+    public static Map<String, String> deserializeSchemaProperties(String serializedProperties) {
+        GsonBuilder gsonBuilder = new GsonBuilder()
+            .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_DESERIALIZER);
+        return gsonBuilder.create().fromJson(serializedProperties, Map.class);
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
new file mode 100644
index 0000000..a2e0f46
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link KeyValueSchemaInfoTest}.
+ */
+@Slf4j
+public class KeyValueSchemaInfoTest {
+
+    private static final Map<String, String> FOO_PROPERTIES = new HashMap() {
+
+        private static final long serialVersionUID = 58641844834472929L;
+
+        {
+            put("foo1", "foo-value1");
+            put("foo2", "foo-value2");
+            put("foo3", "foo-value3");
+        }
+
+    };
+
+    private static final Map<String, String> BAR_PROPERTIES = new HashMap() {
+
+        private static final long serialVersionUID = 58641844834472929L;
+
+        {
+            put("bar1", "bar-value1");
+            put("bar2", "bar-value2");
+            put("bar3", "bar-value3");
+        }
+
+    };
+
+    public static final Schema<Foo> FOO_SCHEMA =
+        Schema.AVRO(SchemaDefinition.<Foo>builder()
+            .withAlwaysAllowNull(false)
+            .withPojo(Foo.class)
+            .withProperties(FOO_PROPERTIES)
+            .build()
+        );
+    public static final Schema<Bar> BAR_SCHEMA =
+        Schema.JSON(SchemaDefinition.<Bar>builder()
+            .withAlwaysAllowNull(true)
+            .withPojo(Bar.class)
+            .withProperties(BAR_PROPERTIES)
+            .build());
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testDecodeNonKeyValueSchemaInfo() {
+        DefaultImplementation.decodeKeyValueSchemaInfo(
+            FOO_SCHEMA.getSchemaInfo()
+        );
+    }
+
+    @DataProvider(name = "encodingTypes")
+    public Object[][] encodingTypes() {
+        return new Object[][] {
+            { KeyValueEncodingType.INLINE },
+            { KeyValueEncodingType.SEPARATED },
+        };
+    }
+
+    @Test(dataProvider = "encodingTypes")
+    public void encodeDecodeKeyValueSchemaInfo(KeyValueEncodingType encodingType) {
+        Schema<KeyValue<Foo, Bar>> kvSchema = Schema.KeyValue(
+            FOO_SCHEMA,
+            BAR_SCHEMA,
+            encodingType
+        );
+        SchemaInfo kvSchemaInfo = kvSchema.getSchemaInfo();
+        assertEquals(
+            DefaultImplementation.decodeKeyValueEncodingType(kvSchemaInfo),
+            encodingType);
+
+        SchemaInfo encodedSchemaInfo =
+            DefaultImplementation.encodeKeyValueSchemaInfo(FOO_SCHEMA, BAR_SCHEMA, encodingType);
+        assertEquals(encodedSchemaInfo, kvSchemaInfo);
+        assertEquals(
+            DefaultImplementation.decodeKeyValueEncodingType(encodedSchemaInfo),
+            encodingType);
+
+        KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+            DefaultImplementation.decodeKeyValueSchemaInfo(kvSchemaInfo);
+
+        assertEquals(schemaInfoKeyValue.getKey(), FOO_SCHEMA.getSchemaInfo());
+        assertEquals(schemaInfoKeyValue.getValue(), BAR_SCHEMA.getSchemaInfo());
+    }
+
+    @Test(dataProvider = "encodingTypes")
+    public void encodeDecodeNestedKeyValueSchemaInfo(KeyValueEncodingType encodingType) {
+        Schema<KeyValue<String, Bar>> nestedSchema =
+            Schema.KeyValue(Schema.STRING, BAR_SCHEMA, KeyValueEncodingType.INLINE);
+        Schema<KeyValue<Foo, KeyValue<String, Bar>>> kvSchema = Schema.KeyValue(
+            FOO_SCHEMA,
+            nestedSchema,
+            encodingType
+        );
+        SchemaInfo kvSchemaInfo = kvSchema.getSchemaInfo();
+        assertEquals(
+            DefaultImplementation.decodeKeyValueEncodingType(kvSchemaInfo),
+            encodingType);
+
+        SchemaInfo encodedSchemaInfo =
+            DefaultImplementation.encodeKeyValueSchemaInfo(
+                FOO_SCHEMA,
+                nestedSchema,
+                encodingType);
+        assertEquals(encodedSchemaInfo, kvSchemaInfo);
+        assertEquals(
+            DefaultImplementation.decodeKeyValueEncodingType(encodedSchemaInfo),
+            encodingType);
+
+        KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+            DefaultImplementation.decodeKeyValueSchemaInfo(kvSchemaInfo);
+
+        assertEquals(schemaInfoKeyValue.getKey(), FOO_SCHEMA.getSchemaInfo());
+        assertEquals(schemaInfoKeyValue.getValue().getType(), SchemaType.KEY_VALUE);
+        KeyValue<SchemaInfo, SchemaInfo> nestedSchemaInfoKeyValue =
+            DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfoKeyValue.getValue());
+
+        assertEquals(nestedSchemaInfoKeyValue.getKey(), Schema.STRING.getSchemaInfo());
+        assertEquals(nestedSchemaInfoKeyValue.getValue(), BAR_SCHEMA.getSchemaInfo());
+    }
+
+    @Test
+    public void testKeyValueSchemaInfoBackwardCompatibility() {
+        Schema<KeyValue<Foo, Bar>> kvSchema = Schema.KeyValue(
+            FOO_SCHEMA,
+            BAR_SCHEMA,
+            KeyValueEncodingType.SEPARATED
+        );
+
+        SchemaInfo oldSchemaInfo = new SchemaInfo()
+            .setName("")
+            .setType(SchemaType.KEY_VALUE)
+            .setSchema(kvSchema.getSchemaInfo().getSchema())
+            .setProperties(Collections.emptyMap());
+
+        assertEquals(
+            DefaultImplementation.decodeKeyValueEncodingType(oldSchemaInfo),
+            KeyValueEncodingType.INLINE);
+
+        KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+            DefaultImplementation.decodeKeyValueSchemaInfo(oldSchemaInfo);
+        // verify the key schema
+        SchemaInfo keySchemaInfo = schemaInfoKeyValue.getKey();
+        assertEquals(
+            SchemaType.BYTES, keySchemaInfo.getType()
+        );
+        assertArrayEquals(
+            "Expected schema = " + FOO_SCHEMA.getSchemaInfo().getSchemaDefinition()
+                + " but found " + keySchemaInfo.getSchemaDefinition(),
+            FOO_SCHEMA.getSchemaInfo().getSchema(),
+            keySchemaInfo.getSchema()
+        );
+        assertFalse(FOO_SCHEMA.getSchemaInfo().getProperties().isEmpty());
+        assertTrue(keySchemaInfo.getProperties().isEmpty());
+        // verify the value schema
+        SchemaInfo valueSchemaInfo = schemaInfoKeyValue.getValue();
+        assertEquals(
+            SchemaType.BYTES, valueSchemaInfo.getType()
+        );
+        assertArrayEquals(
+            BAR_SCHEMA.getSchemaInfo().getSchema(),
+            valueSchemaInfo.getSchema()
+        );
+        assertFalse(BAR_SCHEMA.getSchemaInfo().getProperties().isEmpty());
+        assertTrue(valueSchemaInfo.getProperties().isEmpty());
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
new file mode 100644
index 0000000..bacf22b
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link org.apache.pulsar.common.schema.SchemaInfo}.
+ */
+public class SchemaInfoTest {
+
+    private static final String INT32_SCHEMA_INFO = "{\n"
+        + "  \"name\": \"INT32\",\n"
+        + "  \"schema\": \"\",\n"
+        + "  \"type\": \"INT32\",\n"
+        + "  \"properties\": {}\n"
+        + "}";
+
+    private static final String UTF8_SCHEMA_INFO = "{\n"
+        + "  \"name\": \"String\",\n"
+        + "  \"schema\": \"\",\n"
+        + "  \"type\": \"STRING\",\n"
+        + "  \"properties\": {}\n"
+        + "}";
+
+    private static final String BAR_SCHEMA_INFO = "{\n"
+        + "  \"name\": \"\",\n"
+        + "  \"schema\": {\n"
+        + "    \"type\": \"record\",\n"
+        + "    \"name\": \"Bar\",\n"
+        + "    \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+        + "    \"fields\": [\n"
+        + "      {\n"
+        + "        \"name\": \"field1\",\n"
+        + "        \"type\": \"boolean\"\n"
+        + "      }\n"
+        + "    ]\n"
+        + "  },\n"
+        + "  \"type\": \"JSON\",\n"
+        + "  \"properties\": {\n"
+        + "    \"__alwaysAllowNull\": \"true\",\n"
+        + "    \"bar1\": \"bar-value1\",\n"
+        + "    \"bar2\": \"bar-value2\",\n"
+        + "    \"bar3\": \"bar-value3\"\n"
+        + "  }\n"
+        + "}";
+
+    private static final String FOO_SCHEMA_INFO = "{\n"
+        + "  \"name\": \"\",\n"
+        + "  \"schema\": {\n"
+        + "    \"type\": \"record\",\n"
+        + "    \"name\": \"Foo\",\n"
+        + "    \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+        + "    \"fields\": [\n"
+        + "      {\n"
+        + "        \"name\": \"field1\",\n"
+        + "        \"type\": [\n"
+        + "          \"null\",\n"
+        + "          \"string\"\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      {\n"
+        + "        \"name\": \"field2\",\n"
+        + "        \"type\": [\n"
+        + "          \"null\",\n"
+        + "          \"string\"\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      {\n"
+        + "        \"name\": \"field3\",\n"
+        + "        \"type\": \"int\"\n"
+        + "      },\n"
+        + "      {\n"
+        + "        \"name\": \"field4\",\n"
+        + "        \"type\": [\n"
+        + "          \"null\",\n"
+        + "          {\n"
+        + "            \"type\": \"record\",\n"
+        + "            \"name\": \"Bar\",\n"
+        + "            \"fields\": [\n"
+        + "              {\n"
+        + "                \"name\": \"field1\",\n"
+        + "                \"type\": \"boolean\"\n"
+        + "              }\n"
+        + "            ]\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      {\n"
+        + "        \"name\": \"color\",\n"
+        + "        \"type\": [\n"
+        + "          \"null\",\n"
+        + "          {\n"
+        + "            \"type\": \"enum\",\n"
+        + "            \"name\": \"Color\",\n"
+        + "            \"symbols\": [\n"
+        + "              \"RED\",\n"
+        + "              \"BLUE\"\n"
+        + "            ]\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      {\n"
+        + "        \"name\": \"fieldUnableNull\",\n"
+        + "        \"type\": \"string\",\n"
+        + "        \"default\": \"defaultValue\"\n"
+        + "      }\n"
+        + "    ]\n"
+        + "  },\n"
+        + "  \"type\": \"AVRO\",\n"
+        + "  \"properties\": {\n"
+        + "    \"__alwaysAllowNull\": \"false\",\n"
+        + "    \"foo1\": \"foo-value1\",\n"
+        + "    \"foo2\": \"foo-value2\",\n"
+        + "    \"foo3\": \"foo-value3\"\n"
+        + "  }\n"
+        + "}";
+
+    private static final String KV_SCHEMA_INFO = "{\n"
+        + "  \"name\": \"KeyValue\",\n"
+        + "  \"schema\": {\n"
+        + "    \"key\": {\n"
+        + "      \"name\": \"\",\n"
+        + "      \"schema\": {\n"
+        + "        \"type\": \"record\",\n"
+        + "        \"name\": \"Foo\",\n"
+        + "        \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+        + "        \"fields\": [\n"
+        + "          {\n"
+        + "            \"name\": \"field1\",\n"
+        + "            \"type\": [\n"
+        + "              \"null\",\n"
+        + "              \"string\"\n"
+        + "            ]\n"
+        + "          },\n"
+        + "          {\n"
+        + "            \"name\": \"field2\",\n"
+        + "            \"type\": [\n"
+        + "              \"null\",\n"
+        + "              \"string\"\n"
+        + "            ]\n"
+        + "          },\n"
+        + "          {\n"
+        + "            \"name\": \"field3\",\n"
+        + "            \"type\": \"int\"\n"
+        + "          },\n"
+        + "          {\n"
+        + "            \"name\": \"field4\",\n"
+        + "            \"type\": [\n"
+        + "              \"null\",\n"
+        + "              {\n"
+        + "                \"type\": \"record\",\n"
+        + "                \"name\": \"Bar\",\n"
+        + "                \"fields\": [\n"
+        + "                  {\n"
+        + "                    \"name\": \"field1\",\n"
+        + "                    \"type\": \"boolean\"\n"
+        + "                  }\n"
+        + "                ]\n"
+        + "              }\n"
+        + "            ]\n"
+        + "          },\n"
+        + "          {\n"
+        + "            \"name\": \"color\",\n"
+        + "            \"type\": [\n"
+        + "              \"null\",\n"
+        + "              {\n"
+        + "                \"type\": \"enum\",\n"
+        + "                \"name\": \"Color\",\n"
+        + "                \"symbols\": [\n"
+        + "                  \"RED\",\n"
+        + "                  \"BLUE\"\n"
+        + "                ]\n"
+        + "              }\n"
+        + "            ]\n"
+        + "          },\n"
+        + "          {\n"
+        + "            \"name\": \"fieldUnableNull\",\n"
+        + "            \"type\": \"string\",\n"
+        + "            \"default\": \"defaultValue\"\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      \"type\": \"AVRO\",\n"
+        + "      \"properties\": {\n"
+        + "        \"__alwaysAllowNull\": \"false\",\n"
+        + "        \"foo1\": \"foo-value1\",\n"
+        + "        \"foo2\": \"foo-value2\",\n"
+        + "        \"foo3\": \"foo-value3\"\n"
+        + "      }\n"
+        + "    },\n"
+        + "    \"value\": {\n"
+        + "      \"name\": \"\",\n"
+        + "      \"schema\": {\n"
+        + "        \"type\": \"record\",\n"
+        + "        \"name\": \"Bar\",\n"
+        + "        \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+        + "        \"fields\": [\n"
+        + "          {\n"
+        + "            \"name\": \"field1\",\n"
+        + "            \"type\": \"boolean\"\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      \"type\": \"JSON\",\n"
+        + "      \"properties\": {\n"
+        + "        \"__alwaysAllowNull\": \"true\",\n"
+        + "        \"bar1\": \"bar-value1\",\n"
+        + "        \"bar2\": \"bar-value2\",\n"
+        + "        \"bar3\": \"bar-value3\"\n"
+        + "      }\n"
+        + "    }\n"
+        + "  },\n"
+        + "  \"type\": \"KEY_VALUE\",\n"
+        + "  \"properties\": {\n"
+        + "    \"key.schema.name\": \"\",\n"
+        + "    \"key.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"false\\\",\\\"foo1\\\":\\\"foo-value1\\\",\\\"foo2\\\":\\\"foo-value2\\\",\\\"foo3\\\":\\\"foo-value3\\\"}\",\n"
+        + "    \"key.schema.type\": \"AVRO\",\n"
+        + "    \"kv.encoding.type\": \"SEPARATED\",\n"
+        + "    \"value.schema.name\": \"\",\n"
+        + "    \"value.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"true\\\",\\\"bar1\\\":\\\"bar-value1\\\",\\\"bar2\\\":\\\"bar-value2\\\",\\\"bar3\\\":\\\"bar-value3\\\"}\",\n"
+        + "    \"value.schema.type\": \"JSON\"\n"
+        + "  }\n"
+        + "}";
+
+    @DataProvider(name = "schemas")
+    public Object[][] schemas() {
+        return new Object[][] {
+            {
+                Schema.STRING.getSchemaInfo(), UTF8_SCHEMA_INFO
+            },
+            {
+                Schema.INT32.getSchemaInfo(), INT32_SCHEMA_INFO
+            },
+            {
+                KeyValueSchemaInfoTest.FOO_SCHEMA.getSchemaInfo(), FOO_SCHEMA_INFO
+            },
+            {
+                KeyValueSchemaInfoTest.BAR_SCHEMA.getSchemaInfo(), BAR_SCHEMA_INFO
+            },
+            {
+                Schema.KeyValue(
+                    KeyValueSchemaInfoTest.FOO_SCHEMA,
+                    KeyValueSchemaInfoTest.BAR_SCHEMA,
+                    KeyValueEncodingType.SEPARATED
+                ).getSchemaInfo(),
+                KV_SCHEMA_INFO
+            }
+        };
+    }
+
+    @Test(dataProvider = "schemas")
+    public void testSchemaInfoToString(SchemaInfo si, String jsonifiedStr) {
+        assertEquals(si.toString(), jsonifiedStr);
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
new file mode 100644
index 0000000..7f6f425
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
+import org.apache.pulsar.client.impl.schema.ByteBufSchema;
+import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DateSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.TimeSchema;
+import org.apache.pulsar.client.impl.schema.TimestampSchema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class KeyValueTest {
+
+    private static final Map<Schema, List<Object>> testData = new HashMap() {
+
+        private static final long serialVersionUID = -3081991052949960650L;
+
+        {
+            put(BooleanSchema.of(), Arrays.asList(false, true));
+            put(StringSchema.utf8(), Arrays.asList("my string"));
+            put(ByteSchema.of(), Arrays.asList((byte) 32767, (byte) -32768));
+            put(ShortSchema.of(), Arrays.asList((short) 32767, (short) -32768));
+            put(IntSchema.of(), Arrays.asList((int) 423412424, (int) -41243432));
+            put(LongSchema.of(), Arrays.asList(922337203685477580L, -922337203685477581L));
+            put(FloatSchema.of(), Arrays.asList(5678567.12312f, -5678567.12341f));
+            put(DoubleSchema.of(), Arrays.asList(5678567.12312d, -5678567.12341d));
+            put(BytesSchema.of(), Arrays.asList("my string".getBytes(UTF_8)));
+            put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
+            put(ByteBufSchema.of(), Arrays.asList(Unpooled.wrappedBuffer("my string".getBytes(UTF_8))));
+            put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
+            put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
+            put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
+        }
+    };
+
+    @DataProvider(name = "schemas")
+    public Object[][] schemas() {
+        return new Object[][] {
+            { testData }
+        };
+    }
+
+    @Test(dataProvider = "schemas")
+    public void testAllSchemas(Map<Schema, List<Object>> schemas) {
+        for (Map.Entry<Schema, List<Object>> keyEntry : schemas.entrySet()) {
+            for (Map.Entry<Schema, List<Object>> valueEntry : schemas.entrySet()) {
+                testEncodeDecodeKeyValue(
+                    keyEntry.getKey(),
+                    valueEntry.getKey(),
+                    keyEntry.getValue(),
+                    valueEntry.getValue()
+                );
+            }
+        }
+    }
+
+    private <K, V> void testEncodeDecodeKeyValue(Schema<K> keySchema,
+                                                 Schema<V> valueSchema,
+                                                 List<K> keys,
+                                                 List<V> values) {
+        for (K key : keys) {
+            for (V value : values) {
+                byte[] data = KeyValue.encode(
+                    key, keySchema,
+                    value, valueSchema
+                );
+
+                KeyValue<K, V> kv = KeyValue.decode(
+                    data,
+                    (keyBytes, valueBytes) -> new KeyValue<>(
+                        keySchema.decode(keyBytes),
+                        valueSchema.decode(valueBytes)
+                    )
+                );
+
+                assertEquals(kv.getKey(), key);
+                assertEquals(kv.getValue(), value);
+            }
+        }
+    }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index e68603e..d534811 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
 import lombok.ToString;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 @Builder
@@ -36,4 +37,33 @@ public class SchemaData {
     private final byte[] data;
     @Builder.Default
     private Map<String, String> props = new HashMap<>();
+
+    /**
+     * Convert a schema data to a schema info.
+     *
+     * @return the converted schema info.
+     */
+    public SchemaInfo toSchemaInfo() {
+        return SchemaInfo.builder()
+            .name("")
+            .type(type)
+            .schema(data)
+            .properties(props)
+            .build();
+    }
+
+    /**
+     * Convert a schema info to a schema data
+     *
+     * @param schemaInfo schema info
+     * @return the converted schema schema data
+     */
+    public static SchemaData fromSchemaInfo(SchemaInfo schemaInfo) {
+        return SchemaData.builder()
+            .type(schemaInfo.getType())
+            .data(schemaInfo.getSchema())
+            .props(schemaInfo.getProperties())
+            .build();
+    }
+
 }
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 2a400ae..3af34d7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -159,7 +159,7 @@ public class CLITest extends PulsarTestSuite {
             "schemas",
             "get",
             topicName);
-        assertTrue(result.getStdout().contains("\"type\":\"STRING\""));
+        assertTrue(result.getStdout().contains("\"type\": \"STRING\""));
 
         // delete the schema
         result = container.execCmd(