You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/30 16:33:41 UTC

[pulsar] branch master updated: [pulsar-clients]Store key part of a KeyValue schema into pulsar message keys (#4117)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f21501  [pulsar-clients]Store key part of a KeyValue schema into pulsar message keys (#4117)
7f21501 is described below

commit 7f21501e4477e472d1c474956f52147a698cb535
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed May 1 00:33:34 2019 +0800

    [pulsar-clients]Store key part of a KeyValue schema into pulsar message keys (#4117)
    
    ### Motivation
    
    The current implementation of KeyValue schema stores key and value together as part of message payload. Ideally the key should be stored as part of message key.
    
    It can be done by introducing a property in KeyValue schema to indicate whether store key in payload or as message key.
    
    ### Modifications
    
    * Add keyIsStoredToMessage for encode and decode of KeyValueSchema
    
    ### Verifying this change
    Unit test pass
---
 .../java/org/apache/pulsar/client/api/Schema.java  |   8 +
 .../client/internal/DefaultImplementation.java     |   9 +
 .../pulsar/common/schema/KeyValueEncodingType.java |  29 +++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  10 +
 .../client/impl/TypedMessageBuilderImpl.java       |  27 +++
 .../pulsar/client/impl/schema/KeyValueSchema.java  |  59 +++++-
 .../apache/pulsar/client/impl/MessageImplTest.java |  88 +++++++-
 .../client/impl/TypedMessageBuilderImplTest.java   | 229 +++++++++++++++++++++
 .../client/impl/schema/KeyValueSchemaTest.java     |  95 +++++++++
 9 files changed, 540 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 72de4fe..b55cdbc 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -268,6 +269,13 @@ public interface Schema<T> {
         return DefaultImplementation.newKeyValueSchema(key, value);
     }
 
+    /**
+     * Key Value Schema using passed in key, value and encoding type schemas.
+     */
+    static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value, KeyValueEncodingType keyValueEncodingType) {
+        return DefaultImplementation.newKeyValueSchema(key, value, keyValueEncodingType);
+    }
+
     @Deprecated
     static Schema<GenericRecord> AUTO() {
         return AUTO_CONSUME();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index da48774..d85b278 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.api.schema.*;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -250,6 +251,14 @@ public class DefaultImplementation {
                         "of", Schema.class, Schema.class).invoke(null, keySchema, valueSchema));
     }
 
+    public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema,
+                                                                  KeyValueEncodingType keyValueEncodingType) {
+        return catchExceptions(
+                () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                        "of", Schema.class, Schema.class, KeyValueEncodingType.class)
+                        .invoke(null, keySchema, valueSchema, keyValueEncodingType));
+    }
+
     public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Class<K> key, Class<V> value, SchemaType type) {
         return catchExceptions(
                 () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValueEncodingType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValueEncodingType.java
new file mode 100644
index 0000000..fa9c7ff
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValueEncodingType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+/**
+ * Encoding types of supported KeyValueSchema for Pulsar messages
+ */
+public enum KeyValueEncodingType {
+    // key is stored as message key, while value is stored as message payload
+    SEPARATED,
+    // key and value are stored as message payload
+    INLINE
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 6b9731b..d239dc1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -41,11 +41,14 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 
 public class MessageImpl<T> implements Message<T> {
 
@@ -244,6 +247,13 @@ public class MessageImpl<T> implements Message<T> {
         byte [] schemaVersion = getSchemaVersion();
         if (schema.supportSchemaVersioning() && schemaVersion != null) {
             return schema.decode(getData(), schemaVersion);
+        } else if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+            if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+                return schema.decode(getKeyBytes(), getData());
+            } else {
+                return schema.decode(getData());
+            }
         } else {
             return schema.decode(getData());
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 9ac887e..22f235e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -34,8 +34,11 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 
 public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
@@ -64,6 +67,11 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> key(String key) {
+        if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+            checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
+                    "This method is not allowed to set keys when in encoding type is SEPARATED");
+        }
         msgMetadataBuilder.setPartitionKey(key);
         msgMetadataBuilder.setPartitionKeyB64Encoded(false);
         return this;
@@ -71,6 +79,11 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> keyBytes(byte[] key) {
+        if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+            checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
+                    "This method is not allowed to set keys when in encoding type is SEPARATED");
+        }
         msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
         msgMetadataBuilder.setPartitionKeyB64Encoded(true);
         return this;
@@ -84,7 +97,21 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> value(T value) {
+
         checkArgument(value != null, "Need Non-Null content value");
+        if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+            org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
+            if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+                // set key as the message key
+                msgMetadataBuilder.setPartitionKey(
+                        Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
+                msgMetadataBuilder.setPartitionKeyB64Encoded(true);
+                // set value as the payload
+                this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
+                return this;
+            }
+        }
         this.content = ByteBuffer.wrap(schema.encode(value));
         return this;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 9a913d1..d91140b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl.schema;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
@@ -28,7 +29,9 @@ import com.google.gson.Gson;
 import lombok.Getter;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -37,6 +40,7 @@ import org.apache.pulsar.common.schema.SchemaType;
  * [Key, Value] pair schema definition
  */
 public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
+
     @Getter
     private final Schema<K> keySchema;
     @Getter
@@ -46,21 +50,31 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
     //   [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
     private final SchemaInfo schemaInfo;
 
+    @Getter
+    private final KeyValueEncodingType keyValueEncodingType;
+
     /**
      * Key Value Schema using passed in schema type, support JSON and AVRO currently.
      */
     public static <K, V> Schema<KeyValue<K, V>> of(Class<K> key, Class<V> value, SchemaType type) {
         checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
         if (SchemaType.JSON == type) {
-            return new KeyValueSchema<>(JSONSchema.of(key), JSONSchema.of(value));
+            return new KeyValueSchema<>(JSONSchema.of(key), JSONSchema.of(value), KeyValueEncodingType.INLINE);
         } else {
             // AVRO
-            return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value));
+            return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value), KeyValueEncodingType.INLINE);
         }
     }
 
+
     public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema, Schema<V> valueSchema) {
-        return new KeyValueSchema<>(keySchema, valueSchema);
+        return new KeyValueSchema<>(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+    }
+
+    public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema,
+                                                   Schema<V> valueSchema,
+                                                   KeyValueEncodingType keyValueEncodingType) {
+        return new KeyValueSchema<>(keySchema, valueSchema, keyValueEncodingType);
     }
 
     private static final Schema<KeyValue<byte[], byte[]>> KV_BYTES = new KeyValueSchema<>(
@@ -71,15 +85,22 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         return KV_BYTES;
     }
 
+
     private KeyValueSchema(Schema<K> keySchema,
                            Schema<V> valueSchema) {
+        this(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+    }
+
+    private KeyValueSchema(Schema<K> keySchema,
+                           Schema<V> valueSchema,
+                           KeyValueEncodingType keyValueEncodingType) {
         this.keySchema = keySchema;
         this.valueSchema = valueSchema;
 
         // set schemaInfo
         this.schemaInfo = new SchemaInfo()
-            .setName("KeyValue")
-            .setType(SchemaType.KEY_VALUE);
+                .setName("KeyValue")
+                .setType(SchemaType.KEY_VALUE);
 
         byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
         byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();
@@ -89,6 +110,7 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
                 .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
 
         Map<String, String> properties = Maps.newHashMap();
+
         properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
         properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
         Gson keySchemaGson = new Gson();
@@ -98,20 +120,31 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         Gson valueSchemaGson = new Gson();
         properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));
 
+        checkNotNull(keyValueEncodingType, "Null encoding type is provided");
+        this.keyValueEncodingType = keyValueEncodingType;
+        properties.put("kv.encoding.type", String.valueOf(keyValueEncodingType));
+
         this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
     }
 
-    // encode as bytes: [key.length][key.bytes][value.length][value.bytes]
+    // encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
     public byte[] encode(KeyValue<K, V> message) {
-        byte[] keyBytes = keySchema.encode(message.getKey());
-        byte[] valueBytes = valueSchema.encode(message.getValue());
+        if (keyValueEncodingType != null && keyValueEncodingType == KeyValueEncodingType.INLINE) {
+            byte [] keyBytes = keySchema.encode(message.getKey());
+            byte [] valueBytes = valueSchema.encode(message.getValue());
+            ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
+            byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+            return byteBuffer.array();
+        } else {
+            return valueSchema.encode(message.getValue());
+        }
 
-        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
-        byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
-        return byteBuffer.array();
     }
 
     public KeyValue<K, V> decode(byte[] bytes) {
+        if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+            throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
+        }
         ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
         int keyLength = byteBuffer.getInt();
         byte[] keyBytes = new byte[keyLength];
@@ -121,6 +154,10 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         byte[] valueBytes = new byte[valueLength];
         byteBuffer.get(valueBytes);
 
+        return decode(keyBytes, valueBytes);
+    }
+
+    public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes) {
         return new KeyValue<>(keySchema.decode(keyBytes), valueSchema.decode(valueBytes));
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 66db769..13a08b9 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -18,13 +18,21 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
 import java.nio.ByteBuffer;
+import java.util.Base64;
+
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Unit test of {@link MessageImpl}.
@@ -71,4 +79,78 @@ public class MessageImplTest {
         assertEquals("test-producer", msg.getProducerName());
     }
 
+    @Test
+    public void testDefaultGetProducerDataAssigned() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        // // Check kv.encoding.type default, not set value
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("default");
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertFalse(builder.hasPartitionKey());
+    }
+
+    @Test
+    public void testInlineGetProducerDataAssigned() {
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        // Check kv.encoding.type INLINE
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("inline");
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertFalse(builder.hasPartitionKey());
+    }
+
+    @Test
+    public void testSeparatedGetProducerDataAssigned() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        // Check kv.encoding.type SPRAERATE
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("separated");
+        builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+        builder.setPartitionKeyB64Encoded(true);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertTrue(builder.hasPartitionKey());
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
new file mode 100644
index 0000000..09fde05
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.mockito.Mock;
+import org.testng.annotations.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Base64;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit test of {@link TypedMessageBuilderImpl}.
+ */
+public class TypedMessageBuilderImplTest {
+
+    @Mock
+    protected ProducerBase producerBase;
+
+    @Test
+    public void testDefaultValue() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = new KeyValue<>(foo, bar);
+
+        // Check kv.encoding.type default, not set value
+        TypedMessageBuilderImpl<KeyValue>  typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+        ByteBuffer content = typedMessageBuilder.getContent();
+        byte[] contentByte = new byte[content.remaining()];
+        content.get(contentByte);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>  decodeKeyValue = keyValueSchema.decode(contentByte);
+        assertEquals(decodeKeyValue.getKey(), foo);
+        assertEquals(decodeKeyValue.getValue(), bar);
+        assertEquals(typedMessageBuilderImpl.hasKey(), false);
+    }
+
+    @Test
+    public void testInlineValue() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = new KeyValue<>(foo, bar);
+
+        // Check kv.encoding.type INLINE
+        TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+        ByteBuffer content = typedMessageBuilder.getContent();
+        byte[] contentByte = new byte[content.remaining()];
+        content.get(contentByte);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> decodeKeyValue = keyValueSchema.decode(contentByte);
+        assertEquals(decodeKeyValue.getKey(), foo);
+        assertEquals(decodeKeyValue.getValue(), bar);
+        assertEquals(typedMessageBuilderImpl.hasKey(), false);
+    }
+
+    @Test
+    public void testSeparatedValue() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = new KeyValue<>(foo, bar);
+
+        // Check kv.encoding.type SEPARATED
+        TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+        ByteBuffer content = typedMessageBuilder.getContent();
+        byte[] contentByte = new byte[content.remaining()];
+        content.get(contentByte);
+        assertEquals(typedMessageBuilderImpl.hasKey(), true);
+        assertEquals(typedMessageBuilderImpl.getKey(), Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+        assertEquals(barSchema.decode(contentByte), bar);
+    }
+
+    @Test
+    public void testSetKeyEncodingTypeDefault() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.key("default");
+        assertEquals(typedMessageBuilder.getKey(), "default");
+        assertFalse(typedMessageBuilder.getMetadataBuilder().getPartitionKeyB64Encoded());
+    }
+
+    @Test
+    public void testSetKeyEncodingTypeInline() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.key("inline");
+        assertEquals(typedMessageBuilder.getKey(), "inline");
+        assertFalse(typedMessageBuilder.getMetadataBuilder().getPartitionKeyB64Encoded());
+    }
+
+    @Test
+    public void testSetKeyEncodingTypeSeparated() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+
+        try {
+            TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.key("separated");
+            fail("This should fail");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("This method is not allowed to set keys when in encoding type is SEPARATED"));
+        }
+    }
+
+    @Test
+    public void testSetKeyBytesEncodingTypeDefault() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.keyBytes("default".getBytes());
+        assertEquals(typedMessageBuilder.getKey(), Base64.getEncoder().encodeToString("default".getBytes()));
+        assertTrue(typedMessageBuilder.getMetadataBuilder().getPartitionKeyB64Encoded());
+    }
+
+    @Test
+    public void testSetKeyBytesEncodingTypeInline() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+        TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.keyBytes("inline".getBytes());
+        assertEquals(typedMessageBuilder.getKey(), Base64.getEncoder().encodeToString("inline".getBytes()));
+        assertTrue(typedMessageBuilder.getMetadataBuilder().getPartitionKeyB64Encoded());
+    }
+
+    @Test
+    public void testSetKeyBytesEncodingTypeSeparated() {
+        producerBase = mock(ProducerBase.class);
+
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(producerBase, keyValueSchema);
+
+
+        try {
+            TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.keyBytes("separated".getBytes());
+            fail("This should fail");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("This method is not allowed to set keys when in encoding type is SEPARATED"));
+        }
+    }
+
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 13fe029..821ac24 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -23,11 +23,13 @@ import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Color;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -235,6 +237,99 @@ public class KeyValueSchemaTest {
     }
 
     @Test
+    public void testDefaultKeyValueEncodingTypeSchemaEncodeAndDecode() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(Color.RED);
+
+        // Check kv.encoding.type default not set value
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        Assert.assertTrue(encodeBytes.length > 0);
+
+        KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
+        Foo fooBack = keyValue.getKey();
+        Bar barBack = keyValue.getValue();
+
+        assertEquals(foo, fooBack);
+        assertEquals(bar, barBack);
+    }
+
+    @Test
+    public void testInlineKeyValueEncodingTypeSchemaEncodeAndDecode() {
+
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(Color.RED);
+
+        // Check kv.encoding.type INLINE
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        Assert.assertTrue(encodeBytes.length > 0);
+        KeyValue<Foo, Bar>  keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
+        Foo fooBack = keyValue.getKey();
+        Bar barBack = keyValue.getValue();
+        assertEquals(foo, fooBack);
+        assertEquals(bar, barBack);
+
+    }
+
+    @Test
+    public void testSeparatedKeyValueEncodingTypeSchemaEncodeAndDecode() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(Color.RED);
+
+        // Check kv.encoding.type SEPARATED
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        Assert.assertTrue(encodeBytes.length > 0);
+        try {
+            keyValueSchema.decode(encodeBytes);
+            Assert.fail("This method cannot be used under this SEPARATED encoding type");
+        } catch (SchemaSerializationException e) {
+            Assert.assertTrue(e.getMessage().contains("This method cannot be used under this SEPARATED encoding type"));
+        }
+        KeyValue<Foo, Bar>  keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(fooSchema.encode(foo), encodeBytes);
+        Foo fooBack = keyValue.getKey();
+        Bar barBack = keyValue.getValue();
+        assertEquals(foo, fooBack);
+        assertEquals(bar, barBack);
+    }
+
+    @Test
     public void testAllowNullBytesSchemaEncodeAndDecode() {
         AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
         AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());