You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/22 01:56:56 UTC

[pulsar] branch master updated: [pulsar-broker]Support key value schema compatibility checker (#4192)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a98c846  [pulsar-broker]Support key value schema compatibility checker (#4192)
a98c846 is described below

commit a98c846243760a780748d15209c12c5471f1ae77
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed May 22 09:56:51 2019 +0800

    [pulsar-broker]Support key value schema compatibility checker (#4192)
    
    Motivation
    The key/value types can be AVRO or JSON. Both AVRO and JSON supports schema evolution. We should add a key/value schema compatibility checker to allow schema evolution in broker side.
    
    Modifications
    Add KeyValueSchemaCompatibilityCheck class
    Add Unit Test
    
    Verifying this change
    Unit Test Pass
---
 .../schema/KeyValueSchemaCompatibilityCheck.java   | 130 ++++++
 .../service/schema/SchemaRegistryService.java      |   2 +
 .../KeyValueSchemaCompatibilityCheckTest.java      | 500 +++++++++++++++++++++
 .../integration/functions/PulsarFunctionsTest.java |   4 +-
 .../integration/io/DebeziumMySqlSourceTester.java  |  11 +-
 5 files changed, 643 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
new file mode 100644
index 0000000..f9c9fc2
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.gson.Gson;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.SchemaValidationException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * {@link KeyValueSchemaCompatibilityCheck} for {@link SchemaType#KEY_VALUE}.
+ */
+public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
+
+    private final Map<SchemaType, SchemaCompatibilityCheck> checkers;
+
+    public KeyValueSchemaCompatibilityCheck(Map<SchemaType, SchemaCompatibilityCheck> checkers) {
+        this.checkers = checkers;
+    }
+
+    private KeyValue<byte[], byte[]> splitKeyValueSchema(byte[] bytes) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+        int keyLength = byteBuffer.getInt();
+        byte[] keySchema = new byte[keyLength];
+        byteBuffer.get(keySchema);
+
+        int valueLength = byteBuffer.getInt();
+        byte[] valueSchema = new byte[valueLength];
+        byteBuffer.get(valueSchema);
+        return new KeyValue<>(keySchema, valueSchema);
+    }
+
+    private SchemaType fetchSchemaType(Map<String, String> properties, String key) {
+        if (properties.get(key) != null) {
+            return SchemaType.valueOf(properties.get(key));
+        }
+        return SchemaType.BYTES;
+    }
+
+    private SchemaData fetchSchemaData(
+            byte[] keyValue, SchemaType schemaType, Gson schemaGson, Map<String, String> properties, String key) {
+        if (properties.get(key) != null) {
+            return SchemaData.builder().data(keyValue)
+                    .type(schemaType)
+                    .props(schemaGson.fromJson(properties.get(key), Map.class)).build();
+        }
+        return SchemaData.builder().data(keyValue)
+                .type(schemaType)
+                .props(Collections.emptyMap()).build();
+    }
+
+    private KeyValue<SchemaData, SchemaData> splitKeyValueSchemaData(SchemaData schemaData) {
+        KeyValue<byte[], byte[]> keyValue = this.splitKeyValueSchema(schemaData.getData());
+        Map<String, String> properties = schemaData.getProps();
+        SchemaType keyType = fetchSchemaType(properties, "key.schema.type");
+        SchemaType valueType = fetchSchemaType(properties, "value.schema.type");
+        Gson schemaGson = new Gson();
+        SchemaData keySchemaData = fetchSchemaData(
+                keyValue.getKey(), keyType, schemaGson, properties, "key.schema.properties");
+        SchemaData valueSchemaData = fetchSchemaData(
+                keyValue.getValue(), valueType, schemaGson, properties, "value.schema.properties");
+        return new KeyValue<>(keySchemaData, valueSchemaData);
+    }
+
+    @Override
+    public SchemaType getSchemaType() {
+        return SchemaType.KEY_VALUE;
+    }
+
+    @Override
+    public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
+        return isCompatible(Collections.singletonList(from), to, strategy);
+    }
+
+    @Override
+    public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
+        if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
+            return true;
+        }
+        if (to.getType() != SchemaType.KEY_VALUE) {
+            return false;
+        }
+        LinkedList<SchemaData> fromKeyList = new LinkedList<>();
+        LinkedList<SchemaData> fromValueList = new LinkedList<>();
+        KeyValue<SchemaData, SchemaData> fromKeyValue;
+        KeyValue<SchemaData, SchemaData> toKeyValue = splitKeyValueSchemaData(to);
+        SchemaType toKeyType = toKeyValue.getKey().getType();
+        SchemaType toValueType = toKeyValue.getValue().getType();
+
+        for (SchemaData schemaData : from) {
+            if (schemaData.getType() != SchemaType.KEY_VALUE) {
+                return false;
+            }
+            fromKeyValue = splitKeyValueSchemaData(schemaData);
+            if (fromKeyValue.getKey().getType() != toKeyType || fromKeyValue.getValue().getType() != toValueType) {
+                return false;
+            }
+            fromKeyList.addFirst(fromKeyValue.getKey());
+            fromValueList.addFirst(fromKeyValue.getValue());
+        }
+        SchemaCompatibilityCheck keyCheck = checkers.getOrDefault(toKeyType, SchemaCompatibilityCheck.DEFAULT);
+        SchemaCompatibilityCheck valueCheck = checkers.getOrDefault(toValueType, SchemaCompatibilityCheck.DEFAULT);
+        return keyCheck.isCompatible(fromKeyList, toKeyValue.getKey(), strategy)
+                && valueCheck.isCompatible(fromValueList, toKeyValue.getValue(), strategy);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index a74066b..baa57a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -54,6 +54,8 @@ public interface SchemaRegistryService extends SchemaRegistry {
             Map<SchemaType, SchemaCompatibilityCheck> checkers =
                 getCheckers(config.getSchemaRegistryCompatibilityCheckers());
 
+            checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
+
             schemaStorage.start();
 
             return new SchemaRegistryServiceImpl(schemaStorage, checkers);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
new file mode 100644
index 0000000..0ba8a1c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.collect.Maps;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class KeyValueSchemaCompatibilityCheckTest {
+
+    private final Map<SchemaType, SchemaCompatibilityCheck> checkers = Maps.newHashMap();
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    private static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+        private KeyValueSchemaCompatibilityCheckTest.Bar field4;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    private static class Bar {
+        private boolean field1;
+    }
+
+    @BeforeClass
+    protected void setup() {
+        checkers.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
+        checkers.put(SchemaType.JSON, new JsonSchemaCompatibilityCheck());
+        checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
+    }
+
+    @Test
+    public void testCheckKeyValueAvroCompatibilityFull() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyValueAvroInCompatibilityFull() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyValueAvroCompatibilityBackward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueAvroInCompatibilityBackward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueAvroCompatibilityForward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueAvroInCompatibilityForward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueJsonCompatibilityFull() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyValueJsonInCompatibilityFull() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyValueJsonCompatibilityBackward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueJsonInCompatibilityBackWard() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueJsonCompatibilityForward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyValueJsonInCompatibilityForward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyAvroValueJsonCompatibilityFull() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyAvroValueJsonInCompatibilityFull() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyAvroValueJsonCompatibilityBackward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyAvroValueJsonInCompatibilityBackward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyAvroValueJsonCompatibilityForward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyAvroValueJsonInCompatibilityForward() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroCompatibilityFull() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroInCompatibilityFull() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroCompatibilityBackward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroInCompatibilityBackward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+    }
+
+
+    @Test
+    public void testCheckKeyJsonValueAvroCompatibilityForward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroInCompatibilityForward() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroKeyTypeInCompatibility() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> fromProperties = Maps.newHashMap();
+        fromProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        Map<String, String> toProperties = Maps.newHashMap();
+        toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(barSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckKeyJsonValueAvroValueTypeInCompatibility() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> fromProperties = Maps.newHashMap();
+        fromProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        Map<String, String> toProperties = Maps.newHashMap();
+        toProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+        toProperties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, fooSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+    }
+
+    @Test
+    public void testCheckPropertiesNullTypeCompatibility() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> fromProperties = Maps.newHashMap();
+        fromProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        fromProperties.put("key.schema.properties", null);
+        fromProperties.put("value.schema.properties", null);
+        Map<String, String> toProperties = Maps.newHashMap();
+        toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+        toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+        toProperties.put("key.schema.properties", null);
+        toProperties.put("value.schema.properties", null);
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckSchemaTypeNullCompatibility() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        Map<String, String> fromProperties = Maps.newHashMap();
+        fromProperties.put("key.schema.type", null);
+        fromProperties.put("value.schema.type", null);
+        Map<String, String> toProperties = Maps.newHashMap();
+        toProperties.put("key.schema.type", null);
+        toProperties.put("value.schema.type", null);
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+    }
+
+    @Test
+    public void testCheckSchemaTypeAlwaysCompatibility() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        StringSchema stringSchema = new StringSchema();
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.STRING)
+                .data(stringSchema.getSchemaInfo().getSchema()).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
+        Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE));
+    }
+
+    @Test
+    public void testCheckSchemaTypeOtherCompatibility() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        StringSchema stringSchema = new StringSchema();
+        SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.STRING)
+                .data(stringSchema.getSchemaInfo().getSchema()).build();
+        SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
+        Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE));
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index c31329f..5393011 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -42,12 +42,14 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -1563,7 +1565,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             .build();
 
         @Cleanup
-        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
             .topic(consumeTopicName)
             .subscriptionName("debezium-source-tester")
             .subscriptionType(SubscriptionType.Exclusive)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index ffecbc0..2d1b4b5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -25,6 +25,8 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -84,13 +86,16 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
         return null;
     }
 
-    public void validateSourceResult(Consumer<String> consumer, int number) throws Exception {
+    public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
         int recordsNumber = 0;
-        Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+        Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
             log.info("Received message: {}.", msg.getValue());
-            Assert.assertTrue(msg.getValue().contains("dbserver1.inventory.products"));
+            String key = new String(msg.getValue().getKey());
+            String value = new String(msg.getValue().getValue());
+            Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
+            Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
             consumer.acknowledge(msg);
             msg = consumer.receive(1, TimeUnit.SECONDS);
         }