You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/16 12:44:37 UTC

[pulsar] branch master updated: Add schema admin api get schema info with schema version (#4877)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f859da9  Add schema admin api  get schema info with schema version (#4877)
f859da9 is described below

commit f859da9e0e290c49c080f046d34d9a64334d8c68
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Aug 16 20:44:30 2019 +0800

    Add schema admin api  get schema info with schema version (#4877)
    
    ### Motivation
    
    To fix #4854 and support get keyValueSchema
---
 .../pulsar/broker/admin/v2/SchemasResource.java    | 12 +++-
 .../pulsar/broker/admin/AdminApiSchemaTest.java    | 26 ++++++++
 .../org/apache/pulsar/client/admin/Schemas.java    | 10 ++++
 .../pulsar/client/admin/internal/SchemasImpl.java  | 35 ++++++++++-
 .../client/internal/DefaultImplementation.java     | 40 +++++++++++++
 .../common/schema/SchemaInfoWithVersion.java       | 43 ++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdSchemas.java    |  7 +--
 .../pulsar/client/impl/schema/SchemaUtils.java     | 69 +++++++++++++++++++++-
 .../client/impl/schema/KeyValueSchemaInfoTest.java | 18 +++++-
 .../pulsar/client/impl/schema/SchemaTestUtils.java | 11 ++++
 10 files changed, 262 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 0e68972..2cff0db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -25,6 +25,7 @@ import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -58,6 +59,7 @@ import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -461,11 +463,19 @@ public class SchemasResource extends AdminResource {
     }
 
     private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaAndMetadata schemaAndMetadata) {
+        String schemaData;
+        if (schemaAndMetadata.schema.getType() == SchemaType.KEY_VALUE) {
+            schemaData = DefaultImplementation
+                    .convertKeyValueSchemaInfoDataToString(DefaultImplementation.decodeKeyValueSchemaInfo
+                            (schemaAndMetadata.schema.toSchemaInfo()));
+        } else {
+            schemaData = new String(schemaAndMetadata.schema.getData(), UTF_8);
+        }
         return GetSchemaResponse.builder()
                 .version(getLongSchemaVersion(schemaAndMetadata.version))
                 .type(schemaAndMetadata.schema.getType())
                 .timestamp(schemaAndMetadata.schema.getTimestamp())
-                .data(new String(schemaAndMetadata.schema.getData(), UTF_8))
+                .data(schemaData)
                 .properties(schemaAndMetadata.schema.getProps())
                 .build();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index c24058a..4995126 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.schema.StringSchema;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -111,6 +112,11 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
         testSchemaInfoApi(schema, "schematest/test/test-" + schema.getSchemaInfo().getType());
     }
 
+    @Test(dataProvider = "schemas")
+    public void testSchemaInfoWithVersionApi(Schema<?> schema) throws Exception {
+        testSchemaInfoWithVersionApi(schema, "schematest/test/test-" + schema.getSchemaInfo().getType());
+    }
+
     private <T> void testSchemaInfoApi(Schema<T> schema,
                                        String topicName) throws Exception {
         SchemaInfo si = schema.getSchemaInfo();
@@ -128,4 +134,24 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
         assertEquals(si, readSi);
 
     }
+
+    private <T> void testSchemaInfoWithVersionApi(Schema<T> schema,
+                                       String topicName) throws Exception {
+        SchemaInfo si = schema.getSchemaInfo();
+        admin.schemas().createSchema(topicName, si);
+        log.info("Upload schema to topic {} : {}", topicName, si);
+
+        SchemaInfoWithVersion readSi = admin.schemas().getSchemaInfoWithVersion(topicName);
+        log.info("Read schema of topic {} : {}", topicName, readSi);
+
+        assertEquals(si, readSi.getSchemaInfo());
+        assertEquals(0, readSi.getVersion());
+
+        readSi = admin.schemas().getSchemaInfoWithVersion(topicName + "-partition-0");
+        log.info("Read schema of topic {} : {}", topicName, readSi);
+
+        assertEquals(si, readSi.getSchemaInfo());
+        assertEquals(0, readSi.getVersion());
+
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index fa7dcca..2d4173c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 
 import java.util.List;
 
@@ -42,6 +43,15 @@ public interface Schemas {
     SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException;
 
     /**
+     * Retrieve the latest schema with verison of a topic.
+     *
+     * @param topic topic name, in fully qualified format
+     * @return latest schema with version
+     * @throws PulsarAdminException
+     */
+    SchemaInfoWithVersion getSchemaInfoWithVersion(String topic) throws PulsarAdminException;
+
+    /**
      * Retrieve the schema of a topic at a given <tt>version</tt>.
      *
      * @param topic topic name, in fully qualified format
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 868a807..6ca2c18 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -22,9 +22,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
+
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -34,6 +36,8 @@ import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
 import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
+import org.apache.pulsar.common.schema.SchemaType;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -59,6 +63,17 @@ public class SchemasImpl extends BaseResource implements Schemas {
     }
 
     @Override
+    public SchemaInfoWithVersion getSchemaInfoWithVersion(String topic) throws PulsarAdminException {
+        try {
+            TopicName tn = TopicName.get(topic);
+            GetSchemaResponse response = request(schemaPath(tn)).get(GetSchemaResponse.class);
+            return convertGetSchemaResponseToSchemaInfoWithVersion(tn, response);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException {
         try {
             TopicName tn = TopicName.get(topic);
@@ -187,13 +202,31 @@ public class SchemasImpl extends BaseResource implements Schemas {
     static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
                                                            GetSchemaResponse response) {
         SchemaInfo info = new SchemaInfo();
-        info.setSchema(response.getData().getBytes(UTF_8));
+        byte[] schema;
+        if (response.getType() == SchemaType.KEY_VALUE) {
+            schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(response.getData().getBytes(UTF_8));
+        } else {
+            schema = response.getData().getBytes(UTF_8);
+        }
+        info.setSchema(schema);
         info.setType(response.getType());
         info.setProperties(response.getProperties());
         info.setName(tn.getLocalName());
         return info;
     }
 
+    static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn,
+                                                           GetSchemaResponse response) {
+
+        return  SchemaInfoWithVersion
+                .builder()
+                .schemaInfo(convertGetSchemaResponseToSchemaInfo(tn, response))
+                .version(response.getVersion())
+                .build();
+    }
+
+
+
 
     // the util function exists for backward compatibility concern
     static String convertSchemaDataToStringLegacy(byte[] schemaData) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index c8418b9..f2bc348 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 
 @SuppressWarnings("unchecked")
@@ -361,6 +362,19 @@ public class DefaultImplementation {
     }
 
     /**
+     * Jsonify the schema info with version.
+     *
+     * @param schemaInfoWithVersion the schema info with version
+     * @return the jsonified schema info with version
+     */
+    public static String jsonifySchemaInfoWithVersion(SchemaInfoWithVersion schemaInfoWithVersion) {
+        return catchExceptions(
+                () -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+                        "jsonifySchemaInfoWithVersion", SchemaInfoWithVersion.class
+                ).invoke(null, schemaInfoWithVersion));
+    }
+
+    /**
      * Jsonify the key/value schema info.
      *
      * @param kvSchemaInfo the key/value schema info
@@ -373,6 +387,32 @@ public class DefaultImplementation {
             ).invoke(null, kvSchemaInfo));
     }
 
+    /**
+     * convert the key/value schema data
+     *
+     * @param kvSchemaInfo the key/value schema info
+     * @return the convert key/value schema data string
+     */
+    public static String convertKeyValueSchemaInfoDataToString(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) {
+        return catchExceptions(
+                () -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+                        "convertKeyValueSchemaInfoDataToString", KeyValue.class
+                ).invoke(null, kvSchemaInfo));
+    }
+
+    /**
+     * convert the key/value schema info data json bytes to key/value schema info data bytes
+     *
+     * @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
+     * @return the key/value schema info data bytes
+     */
+    public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) {
+        return catchExceptions(
+                () -> (byte[]) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+                        "convertKeyValueDataStringToSchemaInfoSchema", byte[].class
+                ).invoke(null, keyValueSchemaInfoDataJsonBytes));
+    }
+
     public static BatcherBuilder newDefaultBatcherBuilder() {
         return catchExceptions(
             () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.DefaultBatcherBuilder")
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfoWithVersion.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfoWithVersion.java
new file mode 100644
index 0000000..9cef89b
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfoWithVersion.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
+@Builder
+public class SchemaInfoWithVersion {
+
+    private long version;
+
+    private SchemaInfo schemaInfo;
+
+    @Override
+    public String toString(){
+        return DefaultImplementation.jsonifySchemaInfoWithVersion(this);
+    }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index 9973df8..a863f98 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -27,7 +27,6 @@ import java.net.URLClassLoader;
 import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
-import org.apache.pulsar.common.schema.SchemaInfo;
 
 @Parameters(commandDescription = "Operations about schemas")
 public class CmdSchemas extends CmdBase {
@@ -52,13 +51,11 @@ public class CmdSchemas extends CmdBase {
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            SchemaInfo schemaInfo;
             if (version == null) {
-                schemaInfo = admin.schemas().getSchemaInfo(topic);
+                System.out.println(admin.schemas().getSchemaInfoWithVersion(topic));
             } else {
-                schemaInfo = admin.schemas().getSchemaInfo(topic, version);
+                System.out.println(admin.schemas().getSchemaInfo(topic, version));
             }
-            System.out.println(schemaInfo);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 83b23c9..7d9ae25 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonDeserializationContext;
 import com.google.gson.JsonDeserializer;
@@ -29,7 +30,10 @@ import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
+
+import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -39,16 +43,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 
 /**
  * Utils for schemas.
  */
 public final class SchemaUtils {
 
+    private static final byte[] KEY_VALUE_SCHEMA_IS_PRIMITIVE = new byte[0];
+
+    private static final String KEY_VALUE_SCHEMA_NULL_STRING = "\"\"";
+
     private SchemaUtils() {}
 
     /**
@@ -199,6 +213,21 @@ public final class SchemaUtils {
         return gsonBuilder.create().toJson(schemaInfo);
     }
 
+    /**
+     * Jsonify the schema info with verison.
+     *
+     * @param schemaInfoWithVersion the schema info
+     * @return the jsonified schema info with version
+     */
+    public static String jsonifySchemaInfoWithVersion(SchemaInfoWithVersion schemaInfoWithVersion) {
+        GsonBuilder gsonBuilder = new GsonBuilder()
+                .setPrettyPrinting()
+                .registerTypeHierarchyAdapter(SchemaInfo.class, SCHEMAINFO_ADAPTER)
+                .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+
+        return gsonBuilder.create().toJson(schemaInfoWithVersion);
+    }
+
     private static class SchemaPropertiesSerializer implements JsonSerializer<Map<String, String>> {
 
         @Override
@@ -271,7 +300,7 @@ public final class SchemaUtils {
         }
     }
 
-    private static JsonObject toJsonObject(String json) {
+    public static JsonObject toJsonObject(String json) {
         JsonParser parser = new JsonParser();
         return parser.parse(json).getAsJsonObject();
     }
@@ -302,6 +331,44 @@ public final class SchemaUtils {
     }
 
     /**
+     * convert the key/value schema info data to string
+     *
+     * @param kvSchemaInfo the key/value schema info
+     * @return the convert schema info data string
+     */
+    public static String convertKeyValueSchemaInfoDataToString(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) throws IOException {
+        ObjectMapper objectMapper = ObjectMapperFactory.create();
+        KeyValue<Object, Object> keyValue = new KeyValue<>(SchemaType.isPrimitiveType(kvSchemaInfo.getKey().getType()) ? ""
+                : objectMapper.readTree(kvSchemaInfo.getKey().getSchema()), SchemaType.isPrimitiveType(kvSchemaInfo.getValue().getType()) ?
+                "" : objectMapper.readTree(kvSchemaInfo.getValue().getSchema()));
+        return objectMapper.writeValueAsString(keyValue);
+    }
+
+    private static byte[] getKeyOrValueSchemaBytes(JsonElement jsonElement) {
+        return KEY_VALUE_SCHEMA_NULL_STRING.equals(jsonElement.toString()) ?
+                KEY_VALUE_SCHEMA_IS_PRIMITIVE : jsonElement.toString().getBytes(UTF_8);
+    }
+
+    /**
+     * convert the key/value schema info data json bytes to key/value schema info data bytes
+     *
+     * @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
+     * @return the key/value schema info data bytes
+     */
+    public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) throws IOException {
+        JsonObject jsonObject = toJsonObject(new String(keyValueSchemaInfoDataJsonBytes, UTF_8));
+        byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key"));
+        byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));
+        int dataLength = 4 + keyBytes.length + 4 + valueBytes.length;
+        byte[] schema = new byte[dataLength];
+        //record the key value schema respective length
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.heapBuffer(dataLength);
+        byteBuf.writeInt(keyBytes.length).writeBytes(keyBytes).writeInt(valueBytes.length).writeBytes(valueBytes);
+        byteBuf.readBytes(schema);
+        return schema;
+    }
+
+    /**
      * Serialize schema properties
      *
      * @param properties schema properties
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
index a2e0f46..99a1724 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.KEY_VALUE_SCHEMA_INFO_INCLUDE_PRIMITIVE;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.KEY_VALUE_SCHEMA_INFO_NOT_INCLUDE_PRIMITIVE;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -207,4 +208,19 @@ public class KeyValueSchemaInfoTest {
         assertTrue(valueSchemaInfo.getProperties().isEmpty());
     }
 
+    @Test
+    public void testKeyValueSchemaInfoToString() {
+        String havePrimitiveType = DefaultImplementation
+                .convertKeyValueSchemaInfoDataToString(KeyValueSchemaInfo
+                        .decodeKeyValueSchemaInfo(Schema.KeyValue(Schema.AVRO(Foo.class), Schema.STRING)
+                                .getSchemaInfo()));
+        assertEquals(havePrimitiveType, KEY_VALUE_SCHEMA_INFO_INCLUDE_PRIMITIVE);
+
+        String notHavePrimitiveType = DefaultImplementation
+                .convertKeyValueSchemaInfoDataToString(KeyValueSchemaInfo
+                        .decodeKeyValueSchemaInfo(Schema.KeyValue(Schema.AVRO(Foo.class),
+                                Schema.AVRO(Foo.class)).getSchemaInfo()));
+        assertEquals(notHavePrimitiveType, KEY_VALUE_SCHEMA_INFO_NOT_INCLUDE_PRIMITIVE);
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
index a7abc74..9f7ed21 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
@@ -124,6 +124,17 @@ public class SchemaTestUtils {
             "{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":" +
             "[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":" +
             "\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
+    public static final String KEY_VALUE_SCHEMA_INFO_INCLUDE_PRIMITIVE = "{\"key\":{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"" +
+            "field1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\"," +
+            "{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\"" +
+            ",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]},\"value\":\"\"}";
+    public static final String KEY_VALUE_SCHEMA_INFO_NOT_INCLUDE_PRIMITIVE = "{\"key\":{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\"" +
+            ",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\"" +
+            ",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null}," +
+            "{\"name\":\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]},\"value\":{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":" +
+            "[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\"," +
+            "{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}]," +
+            "\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}}";
 
     public static String[] FOO_FIELDS = {
             "field1",