You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/04 20:20:44 UTC

[incubator-pulsar] branch master updated: Issue 2313: create a JDBC sink connector (#2440)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ac46a  Issue 2313: create a JDBC sink connector (#2440)
79ac46a is described below

commit 79ac46a6ae50e48bed4ccb680d1f7945611f0565
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Wed Sep 5 04:20:42 2018 +0800

    Issue 2313: create a JDBC sink connector (#2440)
    
    ### Motivation
    
    This change is trying to add a basic JDBC sink connector.
    
    ### Modifications
    
    Add the jdbc module to the pulsar-io sub-module.
    Add unit test and integration test for it.
    
    ### Result
    
    ut and integration test pass.
    
    Master Issue: #2442
---
 distribution/io/src/assemble/io.xml                |   5 +
 pom.xml                                            |   9 +-
 pulsar-io/jdbc/lombok.config                       |  23 +++
 pulsar-io/jdbc/pom.xml                             |  96 ++++++++++
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    | 197 +++++++++++++++++++++
 .../apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java  |  92 ++++++++++
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  63 +++++++
 .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java  | 178 +++++++++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 +++
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    | 135 ++++++++++++++
 .../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java   |  95 ++++++++++
 .../org/apache/pulsar/io/jdbc/SqliteUtils.java     | 111 ++++++++++++
 pulsar-io/pom.xml                                  |   1 +
 tests/integration/pom.xml                          |  21 +++
 .../integration/functions/PulsarFunctionsTest.java |  54 +++++-
 .../functions/PulsarFunctionsTestBase.java         |   2 +-
 .../tests/integration/io/JdbcSinkTester.java       | 137 ++++++++++++++
 .../tests/integration/suites/PulsarTestSuite.java  |  11 ++
 18 files changed, 1245 insertions(+), 7 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 8cf7fce..bb75e84 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -75,6 +75,11 @@
       <fileMode>644</fileMode>
     </file>
     <file>
+      <source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
+    <file>
       <source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
diff --git a/pom.xml b/pom.xml
index 1ed2802..681f18e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,8 @@ flexible messaging model and an intuitive client API.</description>
     <aws-sdk.version>1.11.297</aws-sdk.version>
     <avro.version>1.8.2</avro.version>
     <jclouds.version>2.1.1</jclouds.version>
+    <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
+    <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
@@ -822,6 +824,11 @@ flexible messaging model and an intuitive client API.</description>
         <version>${testcontainers.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>mysql</artifactId>
+        <version>${testcontainers.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.arquillian.cube</groupId>
         <artifactId>arquillian-cube-docker</artifactId>
         <version>${arquillian-cube.version}</version>
@@ -1086,7 +1093,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>bin/proto/MLDataFormats_pb2.py</exclude>
-            
+
             <!-- pulasr-io-connector kinesis : auto generated files from flatbuffer schema -->
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
diff --git a/pulsar-io/jdbc/lombok.config b/pulsar-io/jdbc/lombok.config
new file mode 100644
index 0000000..9a9adee
--- /dev/null
+++ b/pulsar-io/jdbc/lombok.config
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+## This file is to fix the conflict with jackson error like this:
+##    com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of ...
+lombok.anyConstructor.addConstructorProperties=true
+config.stopBubbling = true
diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml
new file mode 100644
index 0000000..eed8588
--- /dev/null
+++ b/pulsar-io/jdbc/pom.xml
@@ -0,0 +1,96 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-jdbc</artifactId>
+  <name>Pulsar IO :: Jdbc</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.xerial</groupId>
+      <artifactId>sqlite-jdbc</artifactId>
+      <version>${sqlite-jdbc.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql-jdbc.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
new file mode 100644
index 0000000..425fb57
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink<T> implements Sink<T> {
+    // ----- Runtime fields
+    private JdbcSinkConfig jdbcSinkConfig;
+    @Getter
+    private Connection connection;
+    private String jdbcUrl;
+    private String tableName;
+
+    private JdbcUtils.TableId tableId;
+    private PreparedStatement insertStatement;
+
+    // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic)
+    protected String schema;
+    protected JdbcUtils.TableDefinition tableDefinition;
+
+    // for flush
+    private List<Record<T>> incomingList;
+    private List<Record<T>> swapList;
+    private AtomicBoolean isFlushing;
+    private int timeoutMs;
+    private int batchSize;
+    private ScheduledExecutorService flushExecutor;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+        jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+        if (jdbcSinkConfig.getJdbcUrl() == null) {
+            throw new IllegalArgumentException("Required jdbc Url not set.");
+        }
+
+        Properties properties = new Properties();
+        String username = jdbcSinkConfig.getUserName();
+        String password = jdbcSinkConfig.getPassword();
+        if (username != null) {
+            properties.setProperty("user", username);
+        }
+        if (password != null) {
+            properties.setProperty("password", password);
+        }
+
+        connection = JdbcUtils.getConnection(jdbcUrl, properties);
+        connection.setAutoCommit(false);
+        log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
+
+        schema = jdbcSinkConfig.getSchema();
+        tableName = jdbcSinkConfig.getTableName();
+        tableId = JdbcUtils.getTableId(connection, tableName);
+        tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+        insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
+
+        timeoutMs = jdbcSinkConfig.getTimeoutMs();
+        batchSize = jdbcSinkConfig.getBatchSize();
+        incomingList = Lists.newArrayList();
+        swapList = Lists.newArrayList();
+        isFlushing = new AtomicBoolean(false);
+
+        flushExecutor = Executors.newScheduledThreadPool(1);
+        flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!connection.getAutoCommit()) {
+            connection.commit();
+        }
+        flushExecutor.shutdown();
+        if (connection != null) {
+            connection.close();
+        }
+        log.info("Closed jdbc connection: {}", jdbcUrl);
+    }
+
+    @Override
+    public void write(Record<T> record) throws Exception {
+        int number;
+        synchronized (incomingList) {
+            incomingList.add(record);
+            number = incomingList.size();
+        }
+
+        if (number == batchSize) {
+            flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    // bind value with a PreparedStetement
+    public abstract void bindValue(
+        PreparedStatement statement,
+        Record<T> message) throws Exception;
+
+
+    private void flush() {
+        // if not in flushing state, do flush, else return;
+        if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Starting flush, queue size: {}", incomingList.size());
+            }
+            checkState(swapList.isEmpty(),
+                "swapList should be empty since last flush. swapList.size: " + swapList.size());
+
+            synchronized (incomingList) {
+                List<Record<T>> tmpList;
+                swapList.clear();
+
+                tmpList = swapList;
+                swapList = incomingList;
+                incomingList = tmpList;
+            }
+
+            int updateCount = 0;
+            boolean noInfo = false;
+            try {
+                // bind each record value
+                for (Record<T> record : swapList) {
+                    bindValue(insertStatement, record);
+                    insertStatement.addBatch();
+                    record.ack();
+                }
+
+                for (int updates : insertStatement.executeBatch()) {
+                    if (updates == Statement.SUCCESS_NO_INFO) {
+                        noInfo = true;
+                        continue;
+                    }
+                    updateCount += updateCount;
+                }
+                connection.commit();
+                swapList.forEach(tRecord -> tRecord.ack());
+            } catch (Exception e) {
+                log.error("Got exception ", e);
+                swapList.forEach(tRecord -> tRecord.fail());
+            }
+
+            if (swapList.size() != updateCount) {
+                log.error("Update count {}  not match total number of records {}", updateCount, swapList.size());
+            }
+
+            // finish flush
+            if (log.isDebugEnabled()) {
+                log.debug("Finish flush, queue size: {}", swapList.size());
+            }
+            isFlushing.set(false);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
+            }
+        }
+    }
+
+}
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
new file mode 100644
index 0000000..ec28220
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.sql.PreparedStatement;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
+
+/**
+ * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ */
+@Slf4j
+public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
+
+    private Schema avroSchema = null;
+    private DatumReader<GenericRecord> reader = null;
+
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        super.open(config, sinkContext);
+        // get reader, and read value out as GenericRecord
+        if (avroSchema == null || reader == null) {
+            avroSchema = Schema.parse(schema);
+            reader = new GenericDatumReader<>(avroSchema);
+        }
+        log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString());
+    }
+
+
+    public void bindValue(PreparedStatement statement,
+                          Record<byte[]> message) throws Exception {
+
+        byte[] value = message.getValue();
+        GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+
+        int index = 1;
+        for (ColumnId columnId : tableDefinition.getColumns()) {
+            String colName = columnId.getName();
+            Object obj = record.get(colName);
+            setColumnValue(statement, index++, obj);
+            log.info("set column value: {}", obj.toString());
+        }
+    }
+
+    private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
+        if (value instanceof Integer) {
+            statement.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            statement.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            statement.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            statement.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            statement.setBoolean(index, (Boolean) value);
+        } else if (value instanceof Utf8) {
+            statement.setString(index, ((Utf8)value).toString());
+        } else if (value instanceof Short) {
+            statement.setShort(index, (Short) value);
+        } else {
+            throw new Exception("Not support value type, need to add it. " + value.getClass());
+        }
+    }
+}
+
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
new file mode 100644
index 0000000..3419811
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class JdbcSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String userName;
+    private String password;
+    private String jdbcUrl;
+    private String tableName;
+
+    // schema for input topic
+    private String schema;
+
+    // Optional
+    private int timeoutMs = 500;
+    private int batchSize = 200;
+
+    public static JdbcSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
+    }
+
+    public static JdbcSinkConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), JdbcSinkConfig.class);
+    }
+}
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
new file mode 100644
index 0000000..e959909
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class TableId {
+        private final String catalogName;
+        private final String schemaName;
+        private final String tableName;
+    }
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class ColumnId {
+        private final TableId tableId;
+        private final String name;
+        // SQL type from java.sql.Types
+        private final int type;
+        private final String typeName;
+        // column position in table
+        private final int position;
+    }
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class TableDefinition {
+        private final TableId tableId;
+        private final List<ColumnId> columns;
+    }
+
+    /**
+     * Given a driver type(such as mysql), return its jdbc driver class name.
+     * TODO: test and support more types, also add Driver in pom file.
+     */
+    public static String getDriverClassName(String driver) throws Exception {
+        if (driver.equals("mysql")) {
+            return "com.mysql.jdbc.Driver";
+        } if (driver.equals("sqlite")) {
+            return "org.sqlite.JDBC";
+        } else {
+            throw new Exception("Not tested jdbc driver type: " + driver);
+        }
+    }
+
+    /**
+     * Get the {@link Connection} for the given jdbcUrl.
+     */
+    public static Connection getConnection(String jdbcUrl, Properties properties) throws Exception {
+        String driver = jdbcUrl.split(":")[1];
+        String driverClassName = getDriverClassName(driver);
+        Class.forName(driverClassName);
+
+        return DriverManager.getConnection(jdbcUrl, properties);
+    }
+
+    /**
+     * Get the {@link TableId} for the given tableName.
+     */
+    public static TableId getTableId(Connection connection, String tableName) throws Exception {
+        DatabaseMetaData metadata = connection.getMetaData();
+        try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) {
+            if (rs.next()) {
+                String catalogName = rs.getString(1);
+                String schemaName = rs.getString(2);
+                String gotTableName = rs.getString(3);
+                checkState(tableName.equals(gotTableName),
+                    "TableName not match: " + tableName + " Got: " + gotTableName);
+                if (log.isDebugEnabled()) {
+                    log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName);
+                }
+                return TableId.of(catalogName, schemaName, tableName);
+            } else {
+                throw new Exception("Not able to find table: " + tableName);
+            }
+        }
+    }
+
+    /**
+     * Get the {@link TableDefinition} for the given table.
+     */
+    public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception {
+        TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList());
+
+        try (ResultSet rs = connection.getMetaData().getColumns(
+            tableId.getCatalogName(),
+            tableId.getSchemaName(),
+            tableId.getTableName(),
+            null
+        )) {
+            while (rs.next()) {
+                final String columnName = rs.getString(4);
+
+                final int sqlDataType = rs.getInt(5);
+                final String typeName = rs.getString(6);
+                final int position = rs.getInt(17);
+
+                table.columns.add(ColumnId.of(tableId, columnName, sqlDataType, typeName, position));
+                if (log.isDebugEnabled()) {
+                    log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position);
+                }
+            }
+            return table;
+        }
+    }
+
+    public static String buildInsertSql(TableDefinition table) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(table.tableId.getTableName());
+        builder.append("(");
+
+        table.columns.forEach(columnId -> builder.append(columnId.getName()).append(","));
+        builder.deleteCharAt(builder.length() - 1);
+
+        builder.append(") VALUES(");
+        IntStream.range(0, table.columns.size() - 1).forEach(i -> builder.append("?,"));
+        builder.append("?)");
+
+        return builder.toString();
+    }
+
+    public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException {
+        return connection.prepareStatement(insertSQL);
+    }
+
+}
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..d9d06bd
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: jdbc
+description: Jdbc sink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
new file mode 100644
index 0000000..33bb859
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Jdbc Sink test
+ */
+@Slf4j
+public class JdbcSinkTest {
+    private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+
+    /**
+     * A Simple class to test jdbc class
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        sqliteUtils.setUp();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        sqliteUtils.tearDown();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        JdbcAvroSchemaSink jdbcSink;
+        Map<String, Object> conf;
+        String tableName = "TestOpenAndWriteSink";
+
+        String jdbcUrl = sqliteUtils.sqliteUri();
+        conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);
+
+        jdbcSink = new JdbcAvroSchemaSink();
+
+        sqliteUtils.createTable(
+            "CREATE TABLE " + tableName + "(" +
+                "    field1  TEXT," +
+                "    field2  TEXT," +
+                "    field3 INTEGER," +
+                "PRIMARY KEY (field1));"
+        );
+
+        // prepare a foo Record
+        Foo obj = new Foo();
+        obj.setField1("ValueOfField1");
+        obj.setField2("ValueOfField1");
+        obj.setField3(3);
+        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+        conf.put("schema",  new String(schema.getSchemaInfo().getSchema()));
+        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+
+        byte[] bytes = schema.encode(obj);
+        ByteBuf payload = Unpooled.copiedBuffer(bytes);
+        Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES);
+        Record<byte[]> record = PulsarRecord.<byte[]>builder()
+            .message(message)
+            .topicName("fake_topic_name")
+            .build();
+
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+            obj.toString(),
+            message.getValue().toString(),
+            record.getValue().toString());
+
+        // change batchSize to 1, to flush on each write.
+        conf.put("batchSize", 1);
+        // open should success
+        jdbcSink.open(conf, null);
+
+        // write should success.
+        jdbcSink.write(record);
+        log.info("executed write");
+        // sleep to wait backend flush complete
+        Thread.sleep(500);
+
+        // value has been written to db, read it out and verify.
+        String querySql = "SELECT * FROM " + tableName;
+        sqliteUtils.select(querySql, (resultSet) -> {
+            Assert.assertEquals(obj.getField1(), resultSet.getString(1));
+            Assert.assertEquals(obj.getField2(), resultSet.getString(2));
+            Assert.assertEquals(obj.getField3(), resultSet.getInt(3));
+        });
+
+        jdbcSink.close();
+    }
+
+}
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
new file mode 100644
index 0000000..d58802d
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableId;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Jdbc Utils test
+ */
+@Slf4j
+public class JdbcUtilsTest {
+
+    private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+    @BeforeMethod
+    public void setUp() throws IOException, SQLException {
+        sqliteUtils.setUp();
+    }
+
+    @AfterMethod
+    public void tearDown() throws IOException, SQLException {
+        sqliteUtils.tearDown();
+    }
+
+    @Test
+    public void TestGetTableId() throws Exception {
+        String tableName = "TestGetTableId";
+
+        sqliteUtils.createTable(
+            "CREATE TABLE " + tableName + "(" +
+                "    firstName  TEXT," +
+                "    lastName  TEXT," +
+                "    age INTEGER," +
+                "    bool  NUMERIC," +
+                "    byte  INTEGER," +
+                "    short INTEGER NULL," +
+                "    long INTEGER," +
+                "    float NUMERIC," +
+                "    double NUMERIC," +
+                "    bytes BLOB, " +
+                "PRIMARY KEY (firstName, lastName));"
+        );
+
+        Connection connection = sqliteUtils.getConnection();
+
+        // Test getTableId
+        log.info("verify getTableId");
+        TableId id = JdbcUtils.getTableId(connection, tableName);
+        Assert.assertEquals(id.getTableName(), tableName);
+
+        // Test get getTableDefinition
+        log.info("verify getTableDefinition");
+        TableDefinition table = JdbcUtils.getTableDefinition(connection, id);
+        Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
+        Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
+        Assert.assertEquals(table.getColumns().get(2).getName(), "age");
+        Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
+        Assert.assertEquals(table.getColumns().get(7).getName(), "float");
+        Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC");
+
+        // Test get getTableDefinition
+        log.info("verify buildInsertSql");
+        String expctedStatement = "INSERT INTO " + tableName +
+            "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
+            " VALUES(?,?,?,?,?,?,?,?,?,?)";
+        String statement = JdbcUtils.buildInsertSql(table);
+        Assert.assertEquals(statement, expctedStatement);
+    }
+
+}
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
new file mode 100644
index 0000000..3b4a01a
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public final class SqliteUtils {
+
+    static {
+        try {
+            Class.forName("org.sqlite.JDBC");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public interface ResultSetReadCallback {
+        void read(final ResultSet rs) throws SQLException;
+    }
+
+    private final Path dbPath;
+
+    private Connection connection;
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public SqliteUtils(String testId) {
+        dbPath = Paths.get(testId + ".db");
+    }
+
+    public String sqliteUri() {
+        return "jdbc:sqlite:" + dbPath;
+    }
+
+    public void setUp() throws SQLException, IOException {
+        Files.deleteIfExists(dbPath);
+        connection = DriverManager.getConnection(sqliteUri());
+        connection.setAutoCommit(false);
+    }
+
+    public void tearDown() throws SQLException, IOException {
+        connection.close();
+        Files.deleteIfExists(dbPath);
+    }
+
+    public void createTable(final String createSql) throws SQLException {
+        execute(createSql);
+    }
+
+    public void deleteTable(final String table) throws SQLException {
+        execute("DROP TABLE IF EXISTS " + table);
+
+        //random errors of table not being available happens in the unit tests
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public int select(final String query, final SqliteUtils.ResultSetReadCallback callback) throws SQLException {
+        int count = 0;
+        try (Statement stmt = connection.createStatement()) {
+            try (ResultSet rs = stmt.executeQuery(query)) {
+                while (rs.next()) {
+                    callback.read(rs);
+                    count++;
+                }
+            }
+        }
+        return count;
+    }
+
+    public void execute(String sql) throws SQLException {
+        try (Statement stmt = connection.createStatement()) {
+            stmt.executeUpdate(sql);
+            connection.commit();
+        }
+    }
+
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index f1494db..e89cc02 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -38,6 +38,7 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>kinesis</module>
+    <module>jdbc</module>
     <module>data-genenator</module>
   </modules>
 
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index a3a4694..6d3fdc4 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -78,6 +78,27 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>mysql</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql-jdbc.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-jdbc</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 67136a6..5cefc6a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -36,18 +36,22 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
 import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
 import org.apache.pulsar.tests.integration.io.SinkTester;
 import org.apache.pulsar.tests.integration.io.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -70,15 +74,20 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         testSink(new CassandraSinkTester());
     }
 
+    @Test
+    public void testJdbcSink() throws Exception {
+        testSink(new JdbcSinkTester());
+    }
+
     private void testSink(SinkTester tester) throws Exception {
         tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String inputTopicName = "test-sink-connector-"
-            + functionRuntimeType + "-input-topic-" + randomName(8);
+            + tester.getSinkType() + "-" + functionRuntimeType + "-input-topic-" + randomName(8);
         final String sinkName = "test-sink-connector-"
-            + functionRuntimeType + "-name-" + randomName(8);
+            + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + randomName(8);
         final int numMessages = 20;
 
         // prepare the testing environment for sink
@@ -94,7 +103,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         getSinkStatus(tenant, namespace, sinkName);
 
         // produce messages
-        Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+        Map<String, String> kvs;
+        if (tester instanceof JdbcSinkTester) {
+            kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(Foo.class));
+        } else {
+            kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+        }
 
         // wait for sink to process messages
         waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
@@ -202,6 +216,36 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         return kvs;
     }
 
+    // This for JdbcSinkTester
+    protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
+                                                              int numMessages,  Schema schema) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopicName)
+            .create();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+
+            Foo obj = new Foo();
+            obj.setField1("field1_" + i);
+            obj.setField2("field2_" + i);
+            obj.setField3(i);
+            String value = new String(schema.encode(obj));
+
+            kvs.put(key, value);
+            producer.newMessage()
+                .key(key)
+                .value(value)
+                .send();
+        }
+        return kvs;
+    }
+
     protected void waitForProcessingMessages(String tenant,
                                              String namespace,
                                              String sinkName,
@@ -226,8 +270,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                 // expected in early iterations
             }
 
-            log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
-                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            log.info("{} ms has elapsed but the sink {} hasn't process {} messages, backoff to wait for another 1 second",
+                stopwatch.elapsed(TimeUnit.MILLISECONDS), sinkName, numMessages);
             TimeUnit.SECONDS.sleep(1);
         }
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index fe4795d..7a47f77 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -52,7 +52,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
 
     @BeforeClass
     public void setupFunctionWorkers() {
-        final int numFunctionWorkers = 2;
+        final int numFunctionWorkers = 3;
         log.info("Setting up {} function workers : function runtime type = {}",
             numFunctionWorkers, functionRuntimeType);
         pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, numFunctionWorkers);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
new file mode 100644
index 0000000..6a102f1
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+
+/**
+ * A tester for testing jdbc sink.
+ * This will use MySql as DB server
+ */
+@Slf4j
+public class JdbcSinkTester extends SinkTester {
+
+    /**
+     * A Simple class to test jdbc class,
+     *
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+    }
+
+    private static final String NAME = "jdbc";
+
+    private MySQLContainer mySQLContainer;
+    private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+    private String tableName = "test";
+    private Connection connection;
+
+    public JdbcSinkTester() {
+        super(NAME);
+
+        // container default value is test
+        sinkConfig.put("userName", "test");
+        sinkConfig.put("password", "test");
+        sinkConfig.put("tableName", tableName);
+
+        // prepare schema
+        sinkConfig.put("schema",  new String(schema.getSchemaInfo().getSchema()));
+        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+        sinkConfig.put("batchSize", 1);
+    }
+
+    @Override
+    public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
+        GenericContainer<?> container = containers.get("mysql");
+        checkState(container instanceof MySQLContainer,
+            "No MySQL service found in the cluster");
+
+        this.mySQLContainer = (MySQLContainer) container;
+        log.info("find sink service container: {}", mySQLContainer.getContainerName());
+    }
+
+    @Override
+    public void prepareSink() throws Exception {
+        String jdbcUrl = mySQLContainer.getJdbcUrl();
+        // we need set mysql server address in cluster network.
+        sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test");
+        String driver = mySQLContainer.getDriverClassName();
+        Class.forName(driver);
+
+        connection = DriverManager.getConnection(jdbcUrl, "test", "test");
+        log.info("getConnection: {}, jdbcurl: {}", connection, jdbcUrl);
+
+        // create table
+        String createTable = "CREATE TABLE " + tableName +
+            " (field1 TEXT, field2 TEXT, field3 INTEGER, PRIMARY KEY (field3))";
+        int ret = connection.createStatement().executeUpdate(createTable);
+        log.info("created table in jdbc: {}, return value: {}", createTable, ret);
+    }
+
+    @Override
+    public void validateSinkResult(Map<String, String> kvs) {
+        log.info("Query table content from mysql server: {}", tableName);
+        String querySql = "SELECT * FROM " + tableName;
+        ResultSet rs;
+        try {
+            // backend flush may not complete.
+            Thread.sleep(1000);
+
+            PreparedStatement statement = connection.prepareStatement(querySql);
+            rs = statement.executeQuery();
+
+            while (rs.next()) {
+                String field1 = rs.getString(1);
+                String field2 = rs.getString(2);
+                int field3 = rs.getInt(3);
+
+                String value = kvs.get("key-" + field3);
+
+                Foo obj = schema.decode(value.getBytes());
+                assertEquals(obj.field1, field1);
+                assertEquals(obj.field2, field2);
+                assertEquals(obj.field3, field3);
+            }
+        } catch (Exception e) {
+            log.error("Got exception: ", e);
+            fail("Got exception when op sql.");
+            return;
+        }
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index e20a933..147f273 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarCl
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.MySQLContainer;
 import org.testng.ITest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
@@ -51,6 +52,7 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
 
         // register external services
         Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+
         final String kafkaServiceName = "kafka";
         externalServices.put(
             kafkaServiceName,
@@ -60,10 +62,19 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
                 .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
                     .withName(kafkaServiceName)
                     .withHostName(clusterName + "-" + kafkaServiceName)));
+
         final String cassandraServiceName = "cassandra";
         externalServices.put(
             cassandraServiceName,
             new CassandraContainer(clusterName));
+
+        // use mySQL for jdbc test
+        final String jdbcServiceName = "mysql";
+        externalServices.put(
+            jdbcServiceName,
+            new MySQLContainer()
+                .withExposedPorts(3306));
+
         builder = builder.externalServices(externalServices);
 
         return builder;