You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/20 02:57:14 UTC

[pulsar] branch master updated: [issue#4155][pulsar-clients]Support key value schema versioning (#4211)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 58986a8  [issue#4155][pulsar-clients]Support key value schema versioning (#4211)
58986a8 is described below

commit 58986a854db545a2042b98d2a7fd59ab93bc7d53
Author: tuteng <eg...@gmail.com>
AuthorDate: Mon May 20 10:57:08 2019 +0800

    [issue#4155][pulsar-clients]Support key value schema versioning (#4211)
    
    Fixes #4155
    
    Master Issue: #4155
    
    ### Motivation
    
    Similar to AVRO and JSON schema, we need to support schema versioning for key/value schema.
    
    ### Modifications
    
    Support key value schema versioning.
    
    ### Verifying this change
    
    unit test pass
---
 .../org/apache/pulsar/client/impl/MessageImpl.java |  42 +++-
 .../pulsar/client/impl/schema/KeyValueSchema.java  | 125 +++++++++-
 .../apache/pulsar/client/impl/MessageImplTest.java | 266 ++++++++++++++++++++-
 .../client/impl/schema/KeyValueSchemaTest.java     |  10 +-
 .../SupportVersioningKeyValueSchemaTest.java       | 166 +++++++++++++
 5 files changed, 581 insertions(+), 28 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index d11d177..372fb23 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -242,18 +242,42 @@ public class MessageImpl<T> implements Message<T> {
 
     @Override
     public T getValue() {
-        // check if the schema passed in from client supports schema versioning or not
-        // this is an optimization to only get schema version when necessary
-        byte [] schemaVersion = getSchemaVersion();
-        if (schema.supportSchemaVersioning() && schemaVersion != null) {
-            return schema.decode(getData(), schemaVersion);
-        } else if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
-            KeyValueSchema kvSchema = (KeyValueSchema) schema;
-            if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
-                return schema.decode(getKeyBytes(), getData());
+        if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
+            if (schema.supportSchemaVersioning()) {
+                return getKeyValueBySchemaVersion();
+            } else {
+                return getKeyValue();
+            }
+        } else {
+            // check if the schema passed in from client supports schema versioning or not
+            // this is an optimization to only get schema version when necessary
+            if (schema.supportSchemaVersioning()) {
+                byte[] schemaVersion = getSchemaVersion();
+                if (null == schemaVersion) {
+                    return schema.decode(getData());
+                } else {
+                    return schema.decode(getData(), schemaVersion);
+                }
             } else {
                 return schema.decode(getData());
             }
+        }
+    }
+
+    private T getKeyValueBySchemaVersion() {
+        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        byte[] schemaVersion = getSchemaVersion();
+        if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+            return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+        } else {
+            return schema.decode(getData(), schemaVersion);
+        }
+    }
+
+    private T getKeyValue() {
+        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+            return (T) kvSchema.decode(getKeyBytes(), getData(), null);
         } else {
             return schema.decode(getData());
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index d91140b..066d3b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
@@ -30,12 +31,12 @@ import lombok.Getter;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-
 /**
  * [Key, Value] pair schema definition
  */
@@ -53,6 +54,8 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
     @Getter
     private final KeyValueEncodingType keyValueEncodingType;
 
+    protected SchemaInfoProvider schemaInfoProvider;
+
     /**
      * Key Value Schema using passed in schema type, support JSON and AVRO currently.
      */
@@ -85,6 +88,10 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         return KV_BYTES;
     }
 
+    @Override
+    public boolean supportSchemaVersioning() {
+        return keySchema.supportSchemaVersioning() || valueSchema.supportSchemaVersioning();
+    }
 
     private KeyValueSchema(Schema<K> keySchema,
                            Schema<V> valueSchema) {
@@ -97,6 +104,46 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         this.keySchema = keySchema;
         this.valueSchema = valueSchema;
 
+        if (keySchema instanceof StructSchema) {
+            ((StructSchema) keySchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+                @Override
+                public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+                    SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+                    return decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
+                }
+
+                @Override
+                public SchemaInfo getLatestSchema() {
+                    return ((StructSchema<K>) keySchema).schemaInfo;
+                }
+
+                @Override
+                public String getTopicName() {
+                    return "key-schema";
+                }
+            });
+        }
+
+        if (valueSchema instanceof StructSchema) {
+            ((StructSchema) valueSchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+                @Override
+                public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+                    SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+                    return decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
+                }
+
+                @Override
+                public SchemaInfo getLatestSchema() {
+                    return ((StructSchema<V>) valueSchema).schemaInfo;
+                }
+
+                @Override
+                public String getTopicName() {
+                    return "value-schema";
+                }
+            });
+        }
+
         // set schemaInfo
         this.schemaInfo = new SchemaInfo()
                 .setName("KeyValue")
@@ -125,6 +172,23 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         properties.put("kv.encoding.type", String.valueOf(keyValueEncodingType));
 
         this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
+
+        this.schemaInfoProvider = new SchemaInfoProvider() {
+            @Override
+            public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+                return schemaInfo;
+            }
+
+            @Override
+            public SchemaInfo getLatestSchema() {
+                return schemaInfo;
+            }
+
+            @Override
+            public String getTopicName() {
+                return "key-value-schema";
+            }
+        };
     }
 
     // encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
@@ -142,6 +206,10 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
     }
 
     public KeyValue<K, V> decode(byte[] bytes) {
+        return decode(bytes, null);
+    }
+
+    public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) {
         if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
             throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
         }
@@ -154,14 +222,63 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         byte[] valueBytes = new byte[valueLength];
         byteBuffer.get(valueBytes);
 
-        return decode(keyBytes, valueBytes);
+        return decode(keyBytes, valueBytes, schemaVersion);
     }
 
-    public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes) {
-        return new KeyValue<>(keySchema.decode(keyBytes), valueSchema.decode(valueBytes));
+    public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
+        K k;
+        if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
+            k = keySchema.decode(keyBytes, schemaVersion);
+        } else {
+            k = keySchema.decode(keyBytes);
+        }
+        V v;
+        if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
+            v = valueSchema.decode(valueBytes, schemaVersion);
+        } else {
+            v = valueSchema.decode(valueBytes);
+        }
+        return new KeyValue<>(k, v);
     }
 
     public SchemaInfo getSchemaInfo() {
         return this.schemaInfo;
     }
+
+    public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+        this.schemaInfoProvider = schemaInfoProvider;
+    }
+
+    private static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(schemaInfo.getSchema());
+        int keySchemaLength = byteBuffer.getInt();
+        byte[] key = new byte[keySchemaLength];
+        byteBuffer.get(key);
+        int valueSchemaLength = byteBuffer.getInt();
+        byte[] value = new byte[valueSchemaLength];
+        byteBuffer.get(value);
+        Gson keySchemaGson = new Gson();
+        Map<String, String> keyProperties = Maps.newHashMap();
+        if (schemaInfo.getProperties().get("key.schema.properties") != null) {
+            keyProperties = keySchemaGson.fromJson(schemaInfo.getProperties().get("key.schema.properties"), Map.class);
+        } else {
+            keyProperties = Collections.emptyMap();
+        }
+        SchemaInfo keySchemaInfo = SchemaInfo.builder().schema(key)
+                .properties(keyProperties)
+                .name("")
+                .type(SchemaType.AVRO).build();
+        Gson valueSchemaGson = new Gson();
+        Map<String, String> valueProperties = Maps.newHashMap();
+        if (schemaInfo.getProperties().get("value.schema.properties") != null) {
+            valueProperties = valueSchemaGson.fromJson(schemaInfo.getProperties().get("value.schema.properties"), Map.class);
+        } else {
+            valueProperties = Collections.emptyMap();
+        }
+        SchemaInfo valueSchemaInfo = SchemaInfo.builder().schema(value)
+                .properties(valueProperties)
+                .name("")
+                .type(SchemaType.AVRO).build();
+        return new KeyValue<>(keySchemaInfo, valueSchemaInfo);
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 13a08b9..5475050 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -24,11 +24,19 @@ import java.util.Base64;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.testng.Assert;
 import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertFalse;
@@ -81,8 +89,10 @@ public class MessageImplTest {
 
     @Test
     public void testDefaultGetProducerDataAssigned() {
-        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
-        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
 
         Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
@@ -96,7 +106,8 @@ public class MessageImplTest {
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
         MessageMetadata.Builder builder = MessageMetadata.newBuilder()
                 .setProducerName("default");
-        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -106,10 +117,13 @@ public class MessageImplTest {
     @Test
     public void testInlineGetProducerDataAssigned() {
 
-        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
-        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
 
-        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.INLINE);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+                fooSchema, barSchema, KeyValueEncodingType.INLINE);
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
         foo.setField2("field2");
@@ -121,7 +135,8 @@ public class MessageImplTest {
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
         MessageMetadata.Builder builder = MessageMetadata.newBuilder()
                 .setProducerName("inline");
-        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -130,10 +145,13 @@ public class MessageImplTest {
 
     @Test
     public void testSeparatedGetProducerDataAssigned() {
-        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
-        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
 
-        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+                fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
         foo.setField2("field2");
@@ -147,10 +165,236 @@ public class MessageImplTest {
                 .setProducerName("separated");
         builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
         builder.setPartitionKeyB64Encoded(true);
-        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertTrue(builder.hasPartitionKey());
+    }
+
+    @Test
+    public void testDefaultAVROVersionGetProducerDataAssigned() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("default");
+        ByteString byteString = ByteString.copyFrom(new byte[10]);
+        builder.setSchemaVersion(byteString);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertFalse(builder.hasPartitionKey());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testSeparatedAVROVersionGetProducerDataAssigned() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+                fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("separated");
+        ByteString byteString = ByteString.copyFrom(new byte[10]);
+        builder.setSchemaVersion(byteString);
+        builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+        builder.setPartitionKeyB64Encoded(true);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertTrue(builder.hasPartitionKey());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.SEPARATED);
+    }
+
+    @Test
+    public void testDefaultJSONVersionGetProducerDataAssigned() {
+        JSONSchema<SchemaTestUtils.Foo> fooSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(
+                SchemaTestUtils.Foo.class).build());
+        JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(
+                SchemaTestUtils.Bar.class).build());
+
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("default");
+        ByteString byteString = ByteString.copyFrom(new byte[10]);
+        builder.setSchemaVersion(byteString);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertFalse(builder.hasPartitionKey());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testSeparatedJSONVersionGetProducerDataAssigned() {
+        JSONSchema<SchemaTestUtils.Foo> fooSchema = JSONSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+                fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("separated");
+        ByteString byteString = ByteString.copyFrom(new byte[10]);
+        builder.setSchemaVersion(byteString);
+        builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+        builder.setPartitionKeyB64Encoded(true);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertTrue(builder.hasPartitionKey());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.SEPARATED);
+    }
+
+    @Test
+    public void testDefaultAVROJSONVersionGetProducerDataAssigned() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(
+                SchemaTestUtils.Foo.class).build());
+        JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(
+                SchemaTestUtils.Bar.class).build());
+
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("default");
+        ByteString byteString = ByteString.copyFrom(new byte[10]);
+        builder.setSchemaVersion(byteString);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
+        assertEquals(keyValue.getKey(), foo);
+        assertEquals(keyValue.getValue(), bar);
+        assertFalse(builder.hasPartitionKey());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testSeparatedAVROJSONVersionGetProducerDataAssigned() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        JSONSchema<SchemaTestUtils.Bar> barSchema = JSONSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(
+                fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        MessageMetadata.Builder builder = MessageMetadata.newBuilder()
+                .setProducerName("separated");
+        ByteString byteString = ByteString.copyFrom(new byte[10]);
+        builder.setSchemaVersion(byteString);
+        builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
+        builder.setPartitionKeyB64Encoded(true);
+        MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = MessageImpl.create(
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
         assertTrue(builder.hasPartitionKey());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.SEPARATED);
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 821ac24..9c903fa 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -36,6 +36,7 @@ import org.testng.annotations.Test;
 
 import java.util.Map;
 
+
 @Slf4j
 public class KeyValueSchemaTest {
 
@@ -151,8 +152,10 @@ public class KeyValueSchemaTest {
 
     @Test
     public void testNotAllowNullJsonSchemaCreate() {
-        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
-        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
+        JSONSchema<Foo> fooSchema = JSONSchema.of(
+                SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(
+                SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
 
         Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
         Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
@@ -302,7 +305,6 @@ public class KeyValueSchemaTest {
 
         Schema<KeyValue<Foo, Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
 
-
         Bar bar = new Bar();
         bar.setField1(true);
 
@@ -322,7 +324,7 @@ public class KeyValueSchemaTest {
         } catch (SchemaSerializationException e) {
             Assert.assertTrue(e.getMessage().contains("This method cannot be used under this SEPARATED encoding type"));
         }
-        KeyValue<Foo, Bar>  keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(fooSchema.encode(foo), encodeBytes);
+        KeyValue<Foo, Bar>  keyValue = ((KeyValueSchema)keyValueSchema).decode(fooSchema.encode(foo), encodeBytes, null);
         Foo fooBack = keyValue.getKey();
         Bar barBack = keyValue.getValue();
         assertEquals(foo, fooBack);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
new file mode 100644
index 0000000..375baf5
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class SupportVersioningKeyValueSchemaTest {
+
+    @Test
+    public void testKeyValueVersioningEncodeDecode() {
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+                fooSchema, barSchema);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(SchemaTestUtils.Color.RED);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = keyValueSchema.decode(
+                encodeBytes, new byte[10]);
+        Assert.assertEquals(keyValue.getKey().getField1(), foo.getField1());
+        Assert.assertEquals(keyValue.getKey().getField2(), foo.getField2());
+        Assert.assertEquals(keyValue.getKey().getField3(), foo.getField3());
+        Assert.assertEquals(keyValue.getKey().getField4(), foo.getField4());
+        Assert.assertEquals(keyValue.getKey().getColor(), foo.getColor());
+        Assert.assertTrue(keyValue.getValue().isField1());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testSeparateKeyValueVersioningEncodeDecode() {
+        MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+                fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+        keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+
+        when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                .thenReturn(keyValueSchema.getSchemaInfo());
+
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(SchemaTestUtils.Color.RED);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(
+                fooSchema.encode(foo), encodeBytes, new byte[10]);
+        Assert.assertTrue(keyValue.getValue().isField1());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.SEPARATED);
+    }
+
+    @Test
+    public void testKeyValueDefaultVersioningEncodeDecode() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+                fooSchema, barSchema);
+
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(SchemaTestUtils.Color.RED);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = keyValueSchema.decode(
+                encodeBytes, new byte[10]);
+        Assert.assertEquals(keyValue.getKey().getField1(), foo.getField1());
+        Assert.assertEquals(keyValue.getKey().getField2(), foo.getField2());
+        Assert.assertEquals(keyValue.getKey().getField3(), foo.getField3());
+        Assert.assertEquals(keyValue.getKey().getField4(), foo.getField4());
+        Assert.assertEquals(keyValue.getKey().getColor(), foo.getColor());
+        Assert.assertTrue(keyValue.getValue().isField1());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.INLINE);
+    }
+
+    @Test
+    public void testKeyValueLatestVersioningEncodeDecode() {
+        AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
+        AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+                fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
+
+        SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
+        bar.setField1(true);
+
+        SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(SchemaTestUtils.Color.RED);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(
+                fooSchema.encode(foo), encodeBytes, new byte[10]);
+        Assert.assertTrue(keyValue.getValue().isField1());
+        Assert.assertEquals(
+                KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
+                KeyValueEncodingType.SEPARATED);
+    }
+}