You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/30 13:29:17 UTC

[pulsar] branch master updated: [Pulsar IO][Issue 5633]Support avro schema for debezium connector (#6034)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e08be96  [Pulsar IO][Issue 5633]Support avro schema for debezium connector (#6034)
e08be96 is described below

commit e08be96d98c7730a01be296aae38e39020ff485e
Author: guangning <gu...@apache.org>
AuthorDate: Thu Apr 30 21:29:05 2020 +0800

    [Pulsar IO][Issue 5633]Support avro schema for debezium connector (#6034)
    
    Fixes #5633
    
    
    ### Motivation
    
    Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
    For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function
    
    ### Modifications
    
    * Add a package `kafka-connect-avro-converter-shaded`
    * Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema
    
    ### Verifying this change
    
    Unit test and integration tests
---
 kafka-connect-avro-converter-shaded/pom.xml        | 128 +++++++++++++++++
 pom.xml                                            |  10 ++
 .../pulsar/client/impl/schema/StructSchema.java    |   6 +
 .../impl/schema/generic/GenericAvroReader.java     |  18 ++-
 .../impl/schema/generic/GenericAvroSchema.java     |   4 +
 .../impl/schema/generic/GenericAvroReaderTest.java |  24 +++-
 .../org/apache/pulsar/functions/api/KVRecord.java  |  35 +++++
 .../org/apache/pulsar/functions/api/Record.java    |   5 +
 .../pulsar/functions/instance/SinkRecord.java      |  25 ++++
 .../apache/pulsar/functions/sink/PulsarSink.java   |  38 +++--
 .../org/apache/pulsar/functions/LocalRunner.java   |   2 +-
 pulsar-io/kafka-connect-adaptor/pom.xml            |  16 +++
 .../io/kafka/connect/KafkaConnectSource.java       | 109 +++++++++++++-
 .../connect/schema/KafkaSchemaWrappedSchema.java   |  69 +++++++++
 site2/docs/io-debezium-source.md                   |  18 +++
 .../integration/containers/ChaosContainer.java     |   1 -
 .../integration/functions/PulsarFunctionsTest.java | 156 ++++++++++++---------
 .../io/DebeziumMongoDbSourceTester.java            |   1 +
 .../integration/io/DebeziumMySqlSourceTester.java  |   6 +-
 .../io/DebeziumPostgreSqlSourceTester.java         |   1 +
 .../pulsar/tests/integration/io/SourceTester.java  |  64 ++++++++-
 21 files changed, 645 insertions(+), 91 deletions(-)

diff --git a/kafka-connect-avro-converter-shaded/pom.xml b/kafka-connect-avro-converter-shaded/pom.xml
new file mode 100644
index 0000000..1006367
--- /dev/null
+++ b/kafka-connect-avro-converter-shaded/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>pulsar</artifactId>
+        <groupId>org.apache.pulsar</groupId>
+        <version>2.6.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>kafka-connect-avro-converter-shaded</artifactId>
+    <name>Apache Pulsar :: Kafka Connect Avro Converter shaded</name>
+
+    <dependencies>
+        <!-- confluent connect avro converter -->
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-connect-avro-converter</artifactId>
+            <version>${confluent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>${kafka-avro-convert-jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${kafka-avro-convert-jackson.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+
+                    <artifactSet>
+                        <includes>
+                            <include>io.confluent:*</include>
+                            <include>io.confluent:kafka-avro-serializer</include>
+                            <include>io.confluent:kafka-schema-registry-client</include>
+                            <include>io.confluent:common-config</include>
+                            <include>io.confluent:common-utils</include>
+                            <include>org.apache.avro:*</include>
+
+                            <include>org.codehaus.jackson:jackson-core-asl</include>
+                            <include>org.codehaus.jackson:jackson-mapper-asl</include>
+                            <include>com.thoughtworks.paranamer:paranamer</include>
+                            <include>org.xerial.snappy:snappy-java</include>
+                            <include>org.apache.commons:commons-compress</include>
+                            <include>org.tukaani:xz</include>
+                        </includes>
+                    </artifactSet>
+                    <relocations>
+                        <relocation>
+                            <pattern>io.confluent</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.io.confluent</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>org.apache.avro</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.avro</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>org.codehaus.jackson</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.org.codehaus.jackson</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>com.thoughtworks.paranamer</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.com.thoughtworks.paranamer</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>org.xerial.snappy</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.org.xerial.snappy</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>org.apache.commons</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.org.apache.commons</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>org.tukaani</pattern>
+                            <shadedPattern>org.apache.pulsar.kafka.shade.org.tukaani</shadedPattern>
+                        </relocation>
+                    </relocations>
+                    <transformers>
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
+                    </transformers>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 471abc2..ddeda8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,9 @@ flexible messaging model and an intuitive client API.</description>
     <!-- connector-related modules -->
     <module>pulsar-io</module>
 
+    <!-- kafka connect avro converter shaded, because version mismatch    -->
+    <module>kafka-connect-avro-converter-shaded</module>
+
     <!-- examples -->
     <module>examples</module>
 
@@ -208,6 +211,8 @@ flexible messaging model and an intuitive client API.</description>
     <guava.version>25.1-jre</guava.version>
     <jcip.version>1.0</jcip.version>
     <prometheus-jmx.version>0.12.0</prometheus-jmx.version>
+    <confluent.version>5.3.2</confluent.version>
+    <kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
 
     <!-- test dependencies -->
     <cassandra.version>3.6.0</cassandra.version>
@@ -220,6 +225,7 @@ flexible messaging model and an intuitive client API.</description>
     <javassist.version>3.25.0-GA</javassist.version>
     <failsafe.version>2.3.1</failsafe.version>
     <skyscreamer.version>1.5.0</skyscreamer.version>
+    <confluent.version>5.2.2</confluent.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -1715,5 +1721,9 @@ flexible messaging model and an intuitive client API.</description>
       <id>spring-plugins-release</id>
       <url>https://repo.spring.io/plugins-release/</url>
     </repository>
+    <repository>
+      <id>confluent</id>
+      <url>http://packages.confluent.io/maven/</url>
+    </repository>
   </repositories>
 </project>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index f9faa0b..fc0608b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -78,6 +79,11 @@ public abstract class StructSchema<T> extends AbstractSchema<T> {
     protected StructSchema(SchemaInfo schemaInfo) {
         this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
         this.schemaInfo = schemaInfo;
+
+        if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
+            this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
+                    schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
+        }
     }
 
     public org.apache.avro.Schema getAvroSchema() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index 2ac0c2f..22c63a9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -47,6 +47,8 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
     private final List<Field> fields;
     private final Schema schema;
     private final byte[] schemaVersion;
+    private int offset;
+
     public GenericAvroReader(Schema schema) {
         this(null, schema, null);
     }
@@ -65,12 +67,22 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
         }
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+
+        if (schema.getObjectProp(GenericAvroSchema.OFFSET_PROP) != null) {
+            this.offset = Integer.parseInt(schema.getObjectProp(GenericAvroSchema.OFFSET_PROP).toString());
+        } else {
+            this.offset = 0;
+        }
+
     }
 
     @Override
     public GenericAvroRecord read(byte[] bytes, int offset, int length) {
         try {
-            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length, null);
+            if (offset == 0 && this.offset > 0) {
+                offset = this.offset;
+            }
+            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length - offset, null);
             org.apache.avro.generic.GenericRecord avroRecord =
                     (org.apache.avro.generic.GenericRecord)reader.read(
                     null,
@@ -101,5 +113,9 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
         }
     }
 
+    public int getOffset() {
+        return offset;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(GenericAvroReader.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 98e646e..94c5ba1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -33,6 +33,8 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 @Slf4j
 public class GenericAvroSchema extends GenericSchemaImpl {
 
+    public final static String OFFSET_PROP = "__AVRO_READ_OFFSET__";
+
     public GenericAvroSchema(SchemaInfo schemaInfo) {
         this(schemaInfo, true);
     }
@@ -73,6 +75,8 @@ public class GenericAvroSchema extends GenericSchemaImpl {
                  schemaInfo);
              Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
              Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
+             readerSchema.addProp(OFFSET_PROP, schemaInfo.getProperties().getOrDefault(OFFSET_PROP, "0"));
+
              return new GenericAvroReader(
                      writerSchema,
                      readerSchema,
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
index d77d0f2..1571794 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl.schema.generic;
 
 import static org.testng.Assert.assertEquals;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -38,11 +40,12 @@ public class GenericAvroReaderTest {
     private AvroSchema fooSchemaNotNull;
     private AvroSchema fooSchema;
     private AvroSchema fooV2Schema;
-
+    private AvroSchema fooOffsetSchema;
 
     @BeforeMethod
     public void setup() {
         fooSchema = AvroSchema.of(Foo.class);
+
         fooV2Schema = AvroSchema.of(FooV2.class);
         fooSchemaNotNull = AvroSchema.of(SchemaDefinition
                 .builder()
@@ -50,6 +53,9 @@ public class GenericAvroReaderTest {
                 .withPojo(Foo.class)
                 .build());
 
+        fooOffsetSchema = AvroSchema.of(Foo.class);
+        fooOffsetSchema.getAvroSchema().addProp(GenericAvroSchema.OFFSET_PROP, 5);
+
         foo = new Foo();
         foo.setField1("foo1");
         foo.setField2("bar1");
@@ -83,4 +89,20 @@ public class GenericAvroReaderTest {
         assertEquals(genericRecordByReaderSchema.getField("field3"), 10);
     }
 
+    @Test
+    public void testOffsetSchema() {
+        byte[] fooBytes = fooOffsetSchema.encode(foo);
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeByte(0);
+        byteBuf.writeInt(10);
+        byteBuf.writeBytes(fooBytes);
+
+        GenericAvroReader reader = new GenericAvroReader(fooOffsetSchema.getAvroSchema());
+        assertEquals(reader.getOffset(), 5);
+        GenericRecord record = reader.read(byteBuf.array());
+        assertEquals(record.getField("field1"), "foo1");
+        assertEquals(record.getField("field2"), "bar1");
+        assertEquals(record.getField("fieldUnableNull"), "notNull");
+    }
+
 }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java
new file mode 100644
index 0000000..812e8c6
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+
+/**
+ * key value schema record.
+ */
+public interface KVRecord<K, V> extends Record {
+
+    Schema<K> getKeySchema();
+
+    Schema<V> getValueSchema();
+
+    KeyValueEncodingType getKeyValueEncodingType();
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 3c3d7e8..9280cd7 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.api;
 
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
 
 import java.util.Collections;
 import java.util.Map;
@@ -43,6 +44,10 @@ public interface Record<T> {
         return Optional.empty();
     }
 
+    default Schema<T> getSchema() {
+        return null;
+    }
+
     /**
      * Retrieves the actual data of the record.
      *
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 3e1d5c0..71a3984 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -24,8 +24,13 @@ import java.util.Optional;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
 
+@Slf4j
 @Data
 @AllArgsConstructor
 public class SinkRecord<T> implements Record<T> {
@@ -81,4 +86,24 @@ public class SinkRecord<T> implements Record<T> {
     public Optional<String> getDestinationTopic() {
         return sourceRecord.getDestinationTopic();
     }
+
+    @Override
+    public Schema<T> getSchema() {
+        if (sourceRecord == null) {
+            return null;
+        }
+
+        if (sourceRecord.getSchema() != null) {
+            return sourceRecord.getSchema();
+        }
+
+        if (sourceRecord instanceof KVRecord) {
+            KVRecord kvRecord = (KVRecord) sourceRecord;
+            return KeyValueSchema.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
+                    kvRecord.getKeyValueEncodingType());
+        }
+
+        return null;
+    }
+
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 43100e4..8cb67ef 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -31,7 +31,9 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 import org.apache.pulsar.functions.instance.SinkRecord;
@@ -104,18 +106,18 @@ public class PulsarSink<T> implements Sink<T> {
             return builder.properties(properties).create();
         }
 
-        protected Producer<T> getProducer(String destinationTopic) {
-            return getProducer(destinationTopic, null, destinationTopic);
+        protected Producer<T> getProducer(String destinationTopic, Schema schema) {
+            return getProducer(destinationTopic, null, destinationTopic, schema);
         }
 
-        protected Producer<T> getProducer(String producerId, String producerName, String topicName) {
+        protected Producer<T> getProducer(String producerId, String producerName, String topicName, Schema schema) {
             return publishProducers.computeIfAbsent(producerId, s -> {
                 try {
                     return createProducer(
                             client,
                             topicName,
                             producerName,
-                            schema);
+                            schema != null ? schema : this.schema);
                 } catch (PulsarClientException e) {
                     log.error("Failed to create Producer while doing user publish", e);
                     throw new RuntimeException(e);
@@ -177,7 +179,17 @@ public class PulsarSink<T> implements Sink<T> {
 
         @Override
         public TypedMessageBuilder<T> newMessage(Record<T> record) {
-            return getProducer(record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())).newMessage();
+            if (record.getSchema() != null) {
+                return getProducer(record
+                        .getDestinationTopic()
+                        .orElse(pulsarSinkConfig.getTopic()), record.getSchema())
+                        .newMessage(record.getSchema());
+            } else {
+                return getProducer(record
+                        .getDestinationTopic()
+                        .orElse(pulsarSinkConfig.getTopic()), record.getSchema())
+                        .newMessage();
+            }
         }
 
         @Override
@@ -215,11 +227,17 @@ public class PulsarSink<T> implements Sink<T> {
                 throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
             }
 
-            return getProducer(
+            Producer<T> producer = getProducer(
                     String.format("%s-%s",record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), record.getPartitionId().get()),
                     record.getPartitionId().get(),
-                    record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())
-            ).newMessage();
+                    record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
+                    record.getSchema()
+            );
+            if (record.getSchema() != null) {
+                return producer.newMessage(record.getSchema());
+            } else {
+                return producer.newMessage();
+            }
         }
 
         @Override
@@ -274,7 +292,9 @@ public class PulsarSink<T> implements Sink<T> {
     @Override
     public void write(Record<T> record) {
         TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
-        if (record.getKey().isPresent()) {
+
+        if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema &&
+                ((KeyValueSchema) record.getSchema()).getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED)) {
             msg.key(record.getKey().get());
         }
 
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 244a757..b9ac10e 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -261,7 +261,7 @@ public class LocalRunner {
                 } else {
                     File file = new File(userCodeFile);
                     if (!file.exists()) {
-                        throw new RuntimeException("Source archive does not exist");
+                        throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
                     }
                     functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
                 }
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 854489b..5609cd1 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -69,6 +69,22 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>kafka-connect-avro-converter-shaded</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.confluent</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-broker</artifactId>
       <version>${project.version}</version>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index ed059bf..a178b4b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -34,6 +34,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.functions.api.KVRecord;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.runtime.TaskConfig;
@@ -45,10 +53,14 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
+import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
 
 /**
  * A pulsar source that runs
@@ -74,6 +86,13 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
     // number of outstandingRecords that have been polled but not been acked
     private AtomicInteger outstandingRecords = new AtomicInteger(0);
 
+    private boolean jsonWithEnvelope = false;
+    static private final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
+
+    private final Cache<org.apache.kafka.connect.data.Schema, KafkaSchemaWrappedSchema> readerCache =
+            CacheBuilder.newBuilder().maximumSize(10000)
+                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
         Map<String, String> stringConfig = new HashMap<>();
@@ -83,6 +102,14 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             }
         });
 
+        if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
+            jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
+            config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, jsonWithEnvelope);
+        } else {
+            config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
+        }
+        log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
+
         // get the source class name from config and create source task from reflection
         sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
             .asSubclass(SourceTask.class)
@@ -101,6 +128,14 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             .getDeclaredConstructor()
             .newInstance();
 
+        if (keyConverter instanceof AvroConverter) {
+            keyConverter = new AvroConverter(new MockSchemaRegistryClient());
+            config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
+        }
+        if (valueConverter instanceof AvroConverter) {
+            valueConverter = new AvroConverter(new MockSchemaRegistryClient());
+            config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
+        }
         keyConverter.configure(config, true);
         valueConverter.configure(config, false);
 
@@ -161,7 +196,9 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
 
     @Override
     public void close() {
-        sourceTask.stop();
+        if (sourceTask != null) {
+            sourceTask.stop();
+        }
     }
 
     private synchronized Record<KeyValue<byte[], byte[]>> processSourceRecord(final SourceRecord srcRecord) {
@@ -174,7 +211,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
     private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
     private static long FLUSH_TIMEOUT_MS = 2000;
 
-    private class KafkaSourceRecord implements Record<KeyValue<byte[], byte[]>>  {
+    private class KafkaSourceRecord implements KVRecord<byte[], byte[]> {
         @Getter
         Optional<String> key;
         @Getter
@@ -188,15 +225,42 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
         @Getter
         Optional<String> destinationTopic;
 
+        KafkaSchemaWrappedSchema keySchema;
+
+        KafkaSchemaWrappedSchema valueSchema;
+
         KafkaSourceRecord(SourceRecord srcRecord) {
+            AvroData avroData = new AvroData(1000);
             byte[] keyBytes = keyConverter.fromConnectData(
-                srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
-            byte[] valueBytes = valueConverter.fromConnectData(
-                srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
+                    srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
             this.key = keyBytes != null ? Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
-            this.value = new KeyValue(keyBytes, valueBytes);
+
+            byte[] valueBytes = valueConverter.fromConnectData(
+                    srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
+
+            this.value = new KeyValue<>(keyBytes, valueBytes);
 
             this.topicName = Optional.of(srcRecord.topic());
+
+            if (srcRecord.keySchema() != null) {
+                keySchema = readerCache.getIfPresent(srcRecord.keySchema());
+            }
+            if (srcRecord.valueSchema() != null) {
+                valueSchema = readerCache.getIfPresent(srcRecord.valueSchema());
+            }
+
+            if (srcRecord.keySchema() != null && keySchema == null) {
+                keySchema = new KafkaSchemaWrappedSchema(
+                        avroData.fromConnectSchema(srcRecord.keySchema()), keyConverter);
+                readerCache.put(srcRecord.keySchema(), keySchema);
+            }
+
+            if (srcRecord.valueSchema() != null && valueSchema == null) {
+                valueSchema = new KafkaSchemaWrappedSchema(
+                        avroData.fromConnectSchema(srcRecord.valueSchema()), valueConverter);
+                readerCache.put(srcRecord.valueSchema(), valueSchema);
+            }
+
             this.eventTime = Optional.ofNullable(srcRecord.timestamp());
             this.partitionId = Optional.of(srcRecord.sourcePartition()
                 .entrySet()
@@ -207,6 +271,38 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
         }
 
         @Override
+        public Schema<byte[]> getKeySchema() {
+            if (jsonWithEnvelope || keySchema == null) {
+                return Schema.BYTES;
+            } else {
+                return keySchema;
+            }
+        }
+
+        @Override
+        public Schema<byte[]> getValueSchema() {
+            if (jsonWithEnvelope || valueSchema == null) {
+                return Schema.BYTES;
+            } else {
+                return valueSchema;
+            }
+        }
+
+        @Override
+        public KeyValueEncodingType getKeyValueEncodingType() {
+            if (jsonWithEnvelope) {
+                return KeyValueEncodingType.INLINE;
+            } else {
+                return KeyValueEncodingType.SEPARATED;
+            }
+        }
+
+        @Override
+        public Schema getSchema() {
+            return null;
+        }
+
+        @Override
         public Optional<Long> getRecordSequence() {
             return RECORD_SEQUENCE;
         }
@@ -275,4 +371,5 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             }
         }
     }
+
 }
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
new file mode 100644
index 0000000..2db9d6c
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * Wrapped schema for kafka connect schema.
+ */
+@Slf4j
+public class KafkaSchemaWrappedSchema implements Schema<byte[]>, Serializable {
+
+    private SchemaInfo schemaInfo = null;
+
+    public KafkaSchemaWrappedSchema(org.apache.pulsar.kafka.shade.avro.Schema schema,
+                                    Converter converter) {
+        Map<String, String> props = new HashMap<>();
+        boolean isJsonConverter = converter instanceof JsonConverter;
+        props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
+        this.schemaInfo = SchemaInfo.builder()
+                .name(isJsonConverter? "KafKaJson" : "KafkaAvro")
+                .type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO)
+                .schema(schema.toString().getBytes(UTF_8))
+                .properties(props)
+                .build();
+    }
+
+    @Override
+    public byte[] encode(byte[] data) {
+        return data;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return schemaInfo;
+    }
+
+    @Override
+    public Schema<byte[]> clone() {
+        return null;
+    }
+}
diff --git a/site2/docs/io-debezium-source.md b/site2/docs/io-debezium-source.md
index bfa282c..360a5bf 100644
--- a/site2/docs/io-debezium-source.md
+++ b/site2/docs/io-debezium-source.md
@@ -28,6 +28,24 @@ The configuration of Debezium source connector has the following properties.
 | `database.history.pulsar.service.url` | true | null | Pulsar cluster service URL for history topic. |
 | `pulsar.service.url` | true | null | Pulsar cluster service URL. |
 | `offset.storage.topic` | true | null | Record the last committed offsets that the connector successfully completes. |
+| `json-with-envelope` | false | false | Present the message only consist of payload.
+
+### Converter Options
+
+1. org.apache.kafka.connect.json.JsonConverter
+
+This config `json-with-envelope` is valid only for the JsonConverter. It's default value is false, the consumer use the schema `
+Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)`,
+and the message only consist of payload.
+
+If the config `json-with-envelope` value is true, the consumer use the schema 
+`Schema.KeyValue(Schema.BYTES, Schema.BYTES`, the message consist of schema and payload.
+
+2. org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter
+
+If users select the AvroConverter, then the pulsar consumer should use the schema `Schema.KeyValue(Schema.AUTO_CONSUME(), 
+Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)`, and the message consist of payload.
+
 ### MongoDB Configuration
 | Name | Required | Default | Description |
 |------|----------|---------|-------------|
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 7305849..a66f635 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -29,7 +29,6 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.GenericContainer;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 70e9f45..07262ee 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -42,8 +43,10 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
@@ -63,15 +66,18 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.assertj.core.api.Assertions;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.com.google.common.collect.Sets;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -81,6 +87,7 @@ import java.util.concurrent.TimeUnit;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -140,18 +147,24 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     }
 
     @Test(groups = "source")
-    public void testDebeziumMySqlSource() throws Exception {
-        testDebeziumMySqlConnect();
+    public void testDebeziumMySqlSourceJson() throws Exception {
+        testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
+    }
+
+    @Test(groups = "source")
+    public void testDebeziumMySqlSourceAvro() throws Exception {
+        testDebeziumMySqlConnect(
+                "org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false);
     }
 
     @Test(groups = "source")
     public void testDebeziumPostgreSqlSource() throws Exception {
-        testDebeziumPostgreSqlConnect();
+        testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
     }
 
     @Test(groups = "source")
     public void testDebeziumMongoDbSource() throws Exception{
-        testDebeziumMongoDbConnect();
+        testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
     }
 
     private void testSink(SinkTester tester, boolean builtin) throws Exception {
@@ -2262,15 +2275,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         getFunctionInfoNotFound(functionName);
     }
 
-    private  void testDebeziumMySqlConnect()
-        throws Exception {
+    private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "public/default/dbserver1.inventory.products";
-        final String sourceName = "test-source-connector-"
-            + functionRuntimeType + "-name-" + randomName(8);
+        boolean isJsonConverter = converterClassName.endsWith("JsonConverter");
+        final String consumeTopicName = "debezium/mysql-"
+                + (isJsonConverter ? "json" : "avro")
+                + "/dbserver1.inventory.products";
+        final String sourceName = "test-source-debezium-mysql" + (isJsonConverter ? "json" : "avro")
+                + "-" + functionRuntimeType + "-" + randomName(8);
 
         // This is the binlog count that contained in mysql container.
         final int numMessages = 47;
@@ -2287,28 +2302,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+        initNamespace(admin);
+
         try {
-            // If topic already exists, we should delete it so as not to affect the following tests.
-            admin.topics().getStats(consumeTopicName);
-            admin.topics().delete(consumeTopicName);
-            admin.schemas().deleteSchema(consumeTopicName);
-        } catch (PulsarAdminException e) {
-            // Expected results, ignoring the exception
-            log.info("Topic: {} does not exist, we can continue the following tests. Exceptions message: {}",
+            SchemaInfo lastSchemaInfo = admin.schemas().getSchemaInfo(consumeTopicName);
+            log.info("lastSchemaInfo: {}", lastSchemaInfo == null ? "null" : lastSchemaInfo.toString());
+        } catch (Exception e) {
+            log.warn("failed to get schemaInfo for topic: {}, exceptions message: {}",
                     consumeTopicName, e.getMessage());
         }
-        admin.topics().createNonPartitionedTopic(consumeTopicName);
-        admin.topics().createNonPartitionedTopic(outputTopicName);
 
-        @Cleanup
-        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
-            .topic(consumeTopicName)
-            .subscriptionName("debezium-source-tester")
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscribe();
+        admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster);
+        DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster, converterClassName);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
 
         // setup debezium mysql server
         DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(pulsarCluster.getClusterName());
@@ -2330,26 +2338,35 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         Failsafe.with(statusRetryPolicy).run(() ->
                 waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
+        @Cleanup
+        Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
+                .topic(consumeTopicName)
+                .subscriptionName("debezium-source-tester")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        log.info("[debezium mysql test] create consumer finish. converterName: {}", converterClassName);
+
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null);
+        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
 
         // prepare insert event
         sourceTester.prepareInsertEvent();
 
         // validate the source insert event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
 
         // prepare update event
         sourceTester.prepareUpdateEvent();
 
         // validate the source update event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
 
         // prepare delete event
         sourceTester.prepareDeleteEvent();
 
         // validate the source delete event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
@@ -2358,14 +2375,14 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
-    private  void testDebeziumPostgreSqlConnect() throws Exception {
+    private  void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "public/default/dbserver1.inventory.products";
-        final String sourceName = "test-source-connector-"
-                + functionRuntimeType + "-name-" + randomName(8);
+        final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
+        final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);
+
 
         // This is the binlog count that contained in postgresql container.
         final int numMessages = 26;
@@ -2382,21 +2399,13 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
-        try {
-            // If topic already exists, we should delete it so as not to affect the following tests.
-            admin.topics().getStats(consumeTopicName);
-            admin.topics().delete(consumeTopicName);
-            admin.schemas().deleteSchema(consumeTopicName);
-        } catch (PulsarAdminException e) {
-            // Expected results, ignoring the exception
-            log.info("Topic: {} does not exist, we can continue the following tests. Exceptions message: {}",
-                    consumeTopicName, e.getMessage());
-        }
+        initNamespace(admin);
+
         admin.topics().createNonPartitionedTopic(consumeTopicName);
         admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
+        Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
                 .topic(consumeTopicName)
                 .subscriptionName("debezium-source-tester")
                 .subscriptionType(SubscriptionType.Exclusive)
@@ -2404,6 +2413,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         @Cleanup
         DebeziumPostgreSqlSourceTester sourceTester = new DebeziumPostgreSqlSourceTester(pulsarCluster);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
 
         // setup debezium postgresql server
         DebeziumPostgreSqlContainer postgreSqlContainer = new DebeziumPostgreSqlContainer(pulsarCluster.getClusterName());
@@ -2426,25 +2436,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                 waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null);
+        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
 
         // prepare insert event
         sourceTester.prepareInsertEvent();
 
         // validate the source insert event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
 
         // prepare update event
         sourceTester.prepareUpdateEvent();
 
         // validate the source update event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
 
         // prepare delete event
         sourceTester.prepareDeleteEvent();
 
         // validate the source delete event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
@@ -2453,12 +2463,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
-    private  void testDebeziumMongoDbConnect() throws Exception {
+    private  void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name";
-        final String consumeTopicName = "public/default/dbserver1.inventory.products";
+        final String consumeTopicName = "debezium/mongodb/dbserver1.inventory.products";
         final String sourceName = "test-source-connector-"
                 + functionRuntimeType + "-name-" + randomName(8);
 
@@ -2477,21 +2487,13 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
-        try {
-            // If topic already exists, we should delete it so as not to affect the following tests.
-            admin.topics().getStats(consumeTopicName);
-            admin.topics().delete(consumeTopicName);
-            admin.schemas().deleteSchema(consumeTopicName);
-        } catch (PulsarAdminException e) {
-            // Expected results, ignoring the exception
-            log.info("Topic: {} does not exist, we can continue the following tests. Exceptions message: {}",
-                    consumeTopicName, e.getMessage());
-        }
+        initNamespace(admin);
+
         admin.topics().createNonPartitionedTopic(consumeTopicName);
         admin.topics().createNonPartitionedTopic(outputTopicName);
 
         @Cleanup
-        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
+        Consumer consumer = client.newConsumer(getSchema(jsonWithEnvelope))
                 .topic(consumeTopicName)
                 .subscriptionName("debezium-source-tester")
                 .subscriptionType(SubscriptionType.Exclusive)
@@ -2499,6 +2501,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         @Cleanup
         DebeziumMongoDbSourceTester sourceTester = new DebeziumMongoDbSourceTester(pulsarCluster);
+        sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);
 
         // setup debezium mongodb server
         DebeziumMongoDbContainer mongoDbContainer = new DebeziumMongoDbContainer(pulsarCluster.getClusterName());
@@ -2520,25 +2523,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                 waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
 
         // validate the source result
-        sourceTester.validateSourceResult(consumer, 9, null);
+        sourceTester.validateSourceResult(consumer, 9, null, converterClassName);
 
         // prepare insert event
         sourceTester.prepareInsertEvent();
 
         // validate the source insert event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
 
         // prepare update event
         sourceTester.prepareUpdateEvent();
 
         // validate the source update event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
 
         // prepare delete event
         sourceTester.prepareDeleteEvent();
 
         // validate the source delete event
-        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE);
+        sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
 
         // delete the source
         deleteSource(tenant, namespace, sourceName);
@@ -2547,4 +2550,27 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
+    private void initNamespace(PulsarAdmin admin) {
+        log.info("[initNamespace] start.");
+        try {
+            admin.tenants().createTenant("debezium", new TenantInfo(Sets.newHashSet(),
+                    Sets.newHashSet(pulsarCluster.getClusterName())));
+            admin.namespaces().createNamespace("debezium/mysql-json");
+            admin.namespaces().createNamespace("debezium/mysql-avro");
+            admin.namespaces().createNamespace("debezium/mongodb");
+            admin.namespaces().createNamespace("debezium/postgresql");
+        } catch (Exception e) {
+            log.info("[initNamespace] msg: {}", e.getMessage());
+        }
+        log.info("[initNamespace] finish.");
+    }
+
+    private Schema getSchema(boolean jsonWithEnvelope) {
+        if (jsonWithEnvelope) {
+            return KeyValueSchema.kvBytes();
+        } else {
+            return KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
+        }
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
index 23b5db4..6fa35ef 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
@@ -50,6 +50,7 @@ public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbCon
         sourceConfig.put("mongodb.task.id","1");
         sourceConfig.put("database.whitelist", "inventory");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("topic.namespace", "debezium/mongodb");
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 3287e2b..4b66506 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -48,7 +48,7 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
 
     private final PulsarCluster pulsarCluster;
 
-    public DebeziumMySqlSourceTester(PulsarCluster cluster) {
+    public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassName) {
         super(NAME);
         this.pulsarCluster = cluster;
         pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
@@ -61,6 +61,10 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
         sourceConfig.put("database.server.name", "dbserver1");
         sourceConfig.put("database.whitelist", "inventory");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("key.converter", converterClassName);
+        sourceConfig.put("value.converter", converterClassName);
+        sourceConfig.put("topic.namespace", "debezium/mysql-" +
+                (converterClassName.endsWith("AvroConverter") ? "avro" : "json"));
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index e0efff2..a8deb4c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -63,6 +63,7 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre
         sourceConfig.put("schema.whitelist", "inventory");
         sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
         sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+        sourceConfig.put("topic.namespace", "debezium/postgresql");
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index 5040de5..27037ab 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.Assert;
@@ -46,6 +50,14 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
     protected final String sourceType;
     protected final Map<String, Object> sourceConfig;
 
+    public final static Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
+        add("before");
+        add("after");
+        add("source");
+        add("op");
+        add("ts_ms");
+    }};
+
     protected SourceTester(String sourceType) {
         this.sourceType = sourceType;
         this.sourceConfig = Maps.newHashMap();
@@ -71,7 +83,16 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
 
     public abstract Map<String, String> produceSourceMessages(int numMessages) throws Exception;
 
-    public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
+    public void validateSourceResult(Consumer consumer, int number,
+                                     String eventType, String converterClassName) throws Exception {
+        if (converterClassName.endsWith("AvroConverter")) {
+            validateSourceResultAvro(consumer, number, eventType);
+        } else {
+            validateSourceResultJson(consumer, number, eventType);
+        }
+    }
+
+    public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
         int recordsNumber = 0;
         Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
         while(msg != null) {
@@ -82,7 +103,36 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
             Assert.assertTrue(key.contains(this.keyContains()));
             Assert.assertTrue(value.contains(this.valueContains()));
             if (eventType != null) {
-                Assert.assertTrue(value.contains(this.eventContains(eventType)));
+                Assert.assertTrue(value.contains(this.eventContains(eventType, true)));
+            }
+            consumer.acknowledge(msg);
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+        }
+
+        Assert.assertEquals(recordsNumber, number);
+        log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
+    }
+
+    public void validateSourceResultAvro(Consumer<KeyValue<GenericRecord, GenericRecord>> consumer,
+                                     int number, String eventType) throws Exception {
+        int recordsNumber = 0;
+        Message<KeyValue<GenericRecord, GenericRecord>> msg = consumer.receive(2, TimeUnit.SECONDS);
+        while(msg != null) {
+            recordsNumber ++;
+            GenericRecord keyRecord = msg.getValue().getKey();
+            Assert.assertNotNull(keyRecord.getFields());
+            Assert.assertTrue(keyRecord.getFields().size() > 0);
+
+            GenericRecord valueRecord = msg.getValue().getValue();
+            Assert.assertNotNull(valueRecord.getFields());
+            Assert.assertTrue(valueRecord.getFields().size() > 0);
+            for (Field field : valueRecord.getFields()) {
+                Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()));
+            }
+
+            if (eventType != null) {
+                String op = valueRecord.getField("op").toString();
+                Assert.assertEquals(this.eventContains(eventType, false), op);
             }
             consumer.acknowledge(msg);
             msg = consumer.receive(1, TimeUnit.SECONDS);
@@ -91,20 +141,22 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
         Assert.assertEquals(recordsNumber, number);
         log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
     }
+
     public String keyContains(){
         return "dbserver1.inventory.products.Key";
     }
+
     public String valueContains(){
         return "dbserver1.inventory.products.Value";
     }
 
-    public String eventContains(String eventType) {
+    public String eventContains(String eventType, boolean isJson) {
         if (eventType.equals(INSERT)) {
-            return "\"op\":\"c\"";
+            return isJson ? "\"op\":\"c\"" : "c";
         } else if (eventType.equals(UPDATE)) {
-            return "\"op\":\"u\"";
+            return isJson ? "\"op\":\"u\"" : "u";
         } else {
-            return "\"op\":\"d\"";
+            return isJson ? "\"op\":\"d\"" : "d";
         }
     }
 }