You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/07 16:04:15 UTC

[pulsar] branch master updated: [pulsar-io][jdbc sink]Support delete and update event for JDBC Sink (#4358)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d2141b  [pulsar-io][jdbc sink]Support delete and update event for JDBC Sink (#4358)
7d2141b is described below

commit 7d2141b92d28117bee05499caca4c7a27839d025
Author: tuteng <eg...@gmail.com>
AuthorDate: Mon Jul 8 00:04:11 2019 +0800

    [pulsar-io][jdbc sink]Support delete and update event for JDBC Sink (#4358)
    
    ### Motivation
    
    Currently our JDBC Sink not support deletion and update events.
    Support for delete and update events.
    
    ### Modifications
    
    Support for delete and update events.
    Add some document for JDBC Sink. https://github.com/apache/pulsar/issues/4073
    
    ### Verifying this change
    
    local Unit Test pass.
    Integration test pass
---
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    |  75 +++++---
 .../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java  |  21 ++-
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  14 +-
 .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java  | 100 ++++++++++-
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    | 191 ++++++++++++++++-----
 .../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java   |  36 +++-
 site2/docs/io-connectors.md                        |   1 +
 site2/docs/io-jdbc.md                              |  23 +++
 .../integration/functions/PulsarFunctionsTest.java | 119 +++++++++++--
 .../tests/integration/io/JdbcSinkTester.java       |  12 +-
 10 files changed, 495 insertions(+), 97 deletions(-)

diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index a4c589f..40e2c23 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -51,6 +52,14 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
 
     private JdbcUtils.TableId tableId;
     private PreparedStatement insertStatement;
+    private PreparedStatement updateStatment;
+    private PreparedStatement deleteStatment;
+
+
+    protected static final String ACTION = "ACTION";
+    protected static final String INSERT = "INSERT";
+    protected static final String UPDATE = "UPDATE";
+    protected static final String DELETE = "DELETE";
 
     protected JdbcUtils.TableDefinition tableDefinition;
 
@@ -87,8 +96,8 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
 
         tableName = jdbcSinkConfig.getTableName();
         tableId = JdbcUtils.getTableId(connection, tableName);
-        tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
-        insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
+        // Init PreparedStatement include insert, delete, update
+        initStatement();
 
         timeoutMs = jdbcSinkConfig.getTimeoutMs();
         batchSize = jdbcSinkConfig.getBatchSize();
@@ -100,6 +109,28 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
         flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
     }
 
+    private void initStatement()  throws Exception {
+        List<String> keyList = Lists.newArrayList();
+        String key = jdbcSinkConfig.getKey();
+        if (key !=null && !key.isEmpty()) {
+            keyList = Arrays.asList(key.split(","));
+        }
+        List<String> nonKeyList = Lists.newArrayList();
+        String nonKey = jdbcSinkConfig.getNonKey();
+        if (nonKey != null && !nonKey.isEmpty()) {
+            nonKeyList = Arrays.asList(nonKey.split(","));
+        }
+
+        tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
+        insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
+        if (!nonKeyList.isEmpty()) {
+            updateStatment = JdbcUtils.buildUpdateStatement(connection, JdbcUtils.buildUpdateSql(tableDefinition));
+        }
+        if (!keyList.isEmpty()) {
+            deleteStatment = JdbcUtils.buildDeleteStatement(connection, JdbcUtils.buildDeleteSql(tableDefinition));
+        }
+    }
+
     @Override
     public void close() throws Exception {
         if (!connection.getAutoCommit()) {
@@ -115,11 +146,10 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
     @Override
     public void write(Record<T> record) throws Exception {
         int number;
-        synchronized (incomingList) {
+        synchronized (this) {
             incomingList.add(record);
             number = incomingList.size();
         }
-
         if (number == batchSize) {
             flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
         }
@@ -128,8 +158,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
     // bind value with a PreparedStetement
     public abstract void bindValue(
         PreparedStatement statement,
-        Record<T> message) throws Exception;
-
+        Record<T> message, String action) throws Exception;
 
     private void flush() {
         // if not in flushing state, do flush, else return;
@@ -140,8 +169,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
             if (!swapList.isEmpty()) {
                 throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
             }
-
-            synchronized (incomingList) {
+            synchronized (this) {
                 List<Record<T>> tmpList;
                 swapList.clear();
 
@@ -150,22 +178,24 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
                 incomingList = tmpList;
             }
 
-            int updateCount = 0;
-            boolean noInfo = false;
+            int count = 0;
             try {
                 // bind each record value
                 for (Record<T> record : swapList) {
-                    bindValue(insertStatement, record);
-                    insertStatement.addBatch();
-                    record.ack();
-                }
-
-                for (int updates : insertStatement.executeBatch()) {
-                    if (updates == Statement.SUCCESS_NO_INFO) {
-                        noInfo = true;
-                        continue;
+                    String action = record.getProperties().get(ACTION);
+                    if (action != null && action.equals(DELETE)) {
+                        bindValue(deleteStatment, record, action);
+                        count += 1;
+                        deleteStatment.execute();
+                    } else if (action != null && action.equals(UPDATE)) {
+                        bindValue(updateStatment, record, action);
+                        count += 1;
+                        updateStatment.execute();
+                    } else if (action != null && action.equals(INSERT)){
+                        bindValue(insertStatement, record, action);
+                        count += 1;
+                        insertStatement.execute();
                     }
-                    updateCount += updateCount;
                 }
                 connection.commit();
                 swapList.forEach(tRecord -> tRecord.ack());
@@ -174,14 +204,15 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
                 swapList.forEach(tRecord -> tRecord.fail());
             }
 
-            if (swapList.size() != updateCount) {
-                log.error("Update count {}  not match total number of records {}", updateCount, swapList.size());
+            if (swapList.size() != count) {
+                log.error("Update count {}  not match total number of records {}", count, swapList.size());
             }
 
             // finish flush
             if (log.isDebugEnabled()) {
                 log.debug("Finish flush, queue size: {}", swapList.size());
             }
+            swapList.clear();
             isFlushing.set(false);
         } else {
             if (log.isDebugEnabled()) {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index 640972a..de146c4 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,6 +20,9 @@
 package org.apache.pulsar.io.jdbc;
 
 import java.sql.PreparedStatement;
+import java.util.List;
+
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.functions.api.Record;
@@ -33,7 +36,7 @@ import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 @Connector(
     name = "jdbc",
     type = IOType.SINK,
-    help = "A simple JDBC sink that writes pulser messages to a database table",
+    help = "A simple JDBC sink that writes pulsar messages to a database table",
     configClass = JdbcSinkConfig.class
 )
 @Slf4j
@@ -41,16 +44,24 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
 
     @Override
     public void bindValue(PreparedStatement statement,
-                          Record<GenericRecord> message) throws Exception {
+                          Record<GenericRecord> message, String action) throws Exception {
 
         GenericRecord record = message.getValue();
+        List<ColumnId> columns = Lists.newArrayList();
+        if (action == null || action.equals(INSERT)) {
+            columns = tableDefinition.getColumns();
+        } else if (action.equals(DELETE)){
+            columns.addAll(tableDefinition.getKeyColumns());
+        } else if (action.equals(UPDATE)){
+            columns.addAll(tableDefinition.getNonKeyColumns());
+            columns.addAll(tableDefinition.getKeyColumns());
+        }
 
         int index = 1;
-        for (ColumnId columnId : tableDefinition.getColumns()) {
+        for (ColumnId columnId : columns) {
             String colName = columnId.getName();
             Object obj = record.getField(colName);
             setColumnValue(statement, index++, obj);
-            log.info("set column value: {}", obj.toString());
         }
     }
 
@@ -66,7 +77,7 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
         } else if (value instanceof Boolean) {
             statement.setBoolean(index, (Boolean) value);
         } else if (value instanceof String) {
-            statement.setString(index, (String )value);
+            statement.setString(index, (String)value);
         } else if (value instanceof Short) {
             statement.setShort(index, (Short) value);
         } else {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 3bfc72c..7e238a0 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -66,7 +66,19 @@ public class JdbcSinkConfig implements Serializable {
         help = "The name of the table this connector writes messages to"
     )
     private String tableName;
-
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Fields used in update events. A comma-separated list."
+    )
+    private String nonKey;
+    // Optional
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Fields used in where condition of update and delete Events. A comma-separated list."
+    )
+    private String key;
     // Optional
     @FieldDoc(
         required = false,
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index e959909..21fde90 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -28,8 +28,10 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.stream.IntStream;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -70,7 +72,6 @@ public class JdbcUtils {
         private final int position;
     }
 
-    @Data(staticConstructor = "of")
     @Setter
     @Getter
     @EqualsAndHashCode
@@ -78,6 +79,29 @@ public class JdbcUtils {
     public static class TableDefinition {
         private final TableId tableId;
         private final List<ColumnId> columns;
+        private final List<ColumnId> nonKeyColumns;
+        private final List<ColumnId> keyColumns;
+
+        private TableDefinition(TableId tableId, List<ColumnId> columns) {
+            this(tableId, columns, null, null);
+        }
+        private TableDefinition(TableId tableId, List<ColumnId> columns,
+                               List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {
+            this.tableId = tableId;
+            this.columns = columns;
+            this.nonKeyColumns = nonKeyColumns;
+            this.keyColumns = keyColumns;
+        }
+
+        public static TableDefinition of(TableId tableId, List<ColumnId> columns) {
+            return new TableDefinition(tableId, columns);
+        }
+
+        public static TableDefinition of(TableId tableId, List<ColumnId> columns,
+                                         List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {
+            return new TableDefinition(tableId, columns, nonKeyColumns, keyColumns);
+        }
+
     }
 
     /**
@@ -130,8 +154,10 @@ public class JdbcUtils {
     /**
      * Get the {@link TableDefinition} for the given table.
      */
-    public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception {
-        TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList());
+    public static TableDefinition getTableDefinition(
+            Connection connection, TableId tableId, List<String> keyList, List<String> nonKeyList) throws Exception {
+        TableDefinition table = TableDefinition.of(
+                tableId, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
 
         try (ResultSet rs = connection.getMetaData().getColumns(
             tableId.getCatalogName(),
@@ -146,7 +172,23 @@ public class JdbcUtils {
                 final String typeName = rs.getString(6);
                 final int position = rs.getInt(17);
 
-                table.columns.add(ColumnId.of(tableId, columnName, sqlDataType, typeName, position));
+                ColumnId columnId = ColumnId.of(tableId, columnName, sqlDataType, typeName, position);
+                table.columns.add(columnId);
+                if (keyList != null) {
+                    keyList.forEach((key) -> {
+                        if (key.equals(columnName)) {
+                            table.keyColumns.add(columnId);
+                        }
+                    });
+                }
+                if (nonKeyList != null) {
+                    nonKeyList.forEach((key) -> {
+                        if (key.equals(columnName)) {
+                            table.nonKeyColumns.add(columnId);
+                        }
+                    });
+                }
+
                 if (log.isDebugEnabled()) {
                     log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position);
                 }
@@ -175,4 +217,54 @@ public class JdbcUtils {
         return connection.prepareStatement(insertSQL);
     }
 
+    public static String combationWhere(List<ColumnId> columnIds) {
+        StringBuilder builder = new StringBuilder();
+        if (!columnIds.isEmpty()) {
+            builder.append(" WHERE ");
+            StringJoiner whereJoiner = new StringJoiner(" AND ");
+            columnIds.forEach((columnId -> {
+                StringJoiner equals = new StringJoiner("=");
+                equals.add(columnId.getName()).add("?");
+                whereJoiner.add(equals.toString());
+            }));
+            builder.append(whereJoiner.toString());
+            return builder.toString();
+        }
+        return "";
+    }
+
+    public static String buildUpdateSql(TableDefinition table) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("UPDATE ");
+        builder.append(table.tableId.getTableName());
+        builder.append(" SET ");
+        StringJoiner setJoiner = new StringJoiner(",");
+
+        table.nonKeyColumns.forEach((columnId) ->{
+            StringJoiner equals = new StringJoiner("=");
+            equals.add(columnId.getName()).add("? ");
+            setJoiner.add(equals.toString());
+        });
+        builder.append(setJoiner.toString());
+        builder.append(combationWhere(table.keyColumns));
+        return builder.toString();
+    }
+
+    public static PreparedStatement buildUpdateStatement(Connection connection, String updateSQL) throws SQLException {
+        return connection.prepareStatement(updateSQL);
+    }
+
+    public static String buildDeleteSql(TableDefinition table) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("DELETE ");
+        builder.append("FROM ");
+        builder.append(table.tableId.getTableName());
+        builder.append(combationWhere(table.keyColumns));
+        return builder.toString();
+    }
+
+    public static PreparedStatement buildDeleteStatement(Connection connection, String deleteSQL) throws SQLException {
+        return connection.prepareStatement(deleteSQL);
+    }
+
 }
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index aec9d7e..583c474 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -49,7 +49,8 @@ import static org.mockito.Mockito.when;
 @Slf4j
 public class JdbcSinkTest {
     private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
-    private Message<GenericRecord> message;
+    private JdbcAutoSchemaSink jdbcSink;
+    private final String tableName = "TestOpenAndWriteSink";
 
     /**
      * A Simple class to test jdbc class
@@ -66,79 +67,179 @@ public class JdbcSinkTest {
     @BeforeMethod
     public void setUp() throws Exception {
         sqliteUtils.setUp();
-    }
+        sqliteUtils.createTable(
+                "CREATE TABLE " + tableName + "(" +
+                        "    field1  TEXT," +
+                        "    field2  TEXT," +
+                        "    field3 INTEGER," +
+                        "PRIMARY KEY (field1));"
+        );
 
-    @AfterMethod
-    public void tearDown() throws Exception {
-        sqliteUtils.tearDown();
-    }
+        // prepare data for udpate sql
+        String updateSql = "insert into " + tableName + " values('ValueOfField4', 'ValueOfField4', 4)";
+        sqliteUtils.execute(updateSql);
+
+        // prepare data for delete sql
+        String deleteSql = "insert into " + tableName + " values('ValueOfField5', 'ValueOfField5', 5)";
+        sqliteUtils.execute(deleteSql);
 
-    @Test
-    public void TestOpenAndWriteSink() throws Exception {
-        message = mock(MessageImpl.class);
-        JdbcAutoSchemaSink jdbcSink;
         Map<String, Object> conf;
-        String tableName = "TestOpenAndWriteSink";
-        GenericSchema<GenericRecord> genericAvroSchema;
 
         String jdbcUrl = sqliteUtils.sqliteUri();
+
         conf = Maps.newHashMap();
         conf.put("jdbcUrl", jdbcUrl);
         conf.put("tableName", tableName);
+        conf.put("key", "field3");
+        conf.put("nonKey", "field1,field2");
+        // change batchSize to 1, to flush on each write.
+        conf.put("batchSize", 1);
 
         jdbcSink = new JdbcAutoSchemaSink();
+        jdbcSink = new JdbcAutoSchemaSink();
 
-        sqliteUtils.createTable(
-            "CREATE TABLE " + tableName + "(" +
-                "    field1  TEXT," +
-                "    field2  TEXT," +
-                "    field3 INTEGER," +
-                "PRIMARY KEY (field1));"
-        );
+        // open should success
+        jdbcSink.open(conf, null);
 
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        sqliteUtils.tearDown();
+        jdbcSink.close();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
-        Foo obj = new Foo();
-        obj.setField1("ValueOfField1");
-        obj.setField2("ValueOfField1");
-        obj.setField3(3);
+        Foo insertObj = new Foo();
+        insertObj.setField1("ValueOfField1");
+        insertObj.setField2("ValueOfField1");
+        insertObj.setField3(3);
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
-        byte[] bytes = schema.encode(obj);
+        byte[] insertBytes = schema.encode(insertObj);
 
-        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
-            .message(message)
-            .topicName("fake_topic_name")
+        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+            .message(insertMessage)
+            .topicName("fake_topic_name").ackFunction(new Runnable(){
+                    public void run(){
+                    }
+                })
             .build();
 
         genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
 
-        when(message.getValue())
-                .thenReturn(genericAvroSchema.decode(bytes));
+        Map<String, String> insertProperties = Maps.newHashMap();
+        insertProperties.put("ACTION", "INSERT");
+        when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
+        when(insertMessage.getProperties()).thenReturn(insertProperties);
         log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-            obj.toString(),
-            message.getValue().toString(),
-            record.getValue().toString());
-
-        // change batchSize to 1, to flush on each write.
-        conf.put("batchSize", 1);
-        // open should success
-        jdbcSink.open(conf, null);
+                insertObj.toString(),
+                insertMessage.getValue().toString(),
+                insertRecord.getValue().toString());
 
         // write should success.
-        jdbcSink.write(record);
+        jdbcSink.write(insertRecord);
         log.info("executed write");
         // sleep to wait backend flush complete
-        Thread.sleep(500);
+        Thread.sleep(1000);
 
         // value has been written to db, read it out and verify.
-        String querySql = "SELECT * FROM " + tableName;
-        sqliteUtils.select(querySql, (resultSet) -> {
-            Assert.assertEquals(obj.getField1(), resultSet.getString(1));
-            Assert.assertEquals(obj.getField2(), resultSet.getString(2));
-            Assert.assertEquals(obj.getField3(), resultSet.getInt(3));
+        String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+        int count = sqliteUtils.select(querySql, (resultSet) -> {
+            Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+            Assert.assertEquals(insertObj.getField2(), resultSet.getString(2));
+            Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
         });
+        Assert.assertEquals(count, 1);
 
-        jdbcSink.close();
+    }
+
+
+    @Test
+    public void TestUpdateAction() throws Exception {
+
+        AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Foo updateObj = new Foo();
+        updateObj.setField1("ValueOfField3");
+        updateObj.setField2("ValueOfField3");
+        updateObj.setField3(4);
+
+        byte[] updateBytes = schema.encode(updateObj);
+        Message<GenericRecord> updateMessage = mock(MessageImpl.class);
+        Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
+                .message(updateMessage)
+                .topicName("fake_topic_name").ackFunction(new Runnable(){
+                    public void run(){
+                    }
+                })
+                .build();
+
+        GenericSchema<GenericRecord> updateGenericAvroSchema;
+        updateGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        Map<String, String> updateProperties = Maps.newHashMap();
+        updateProperties.put("ACTION", "UPDATE");
+        when(updateMessage.getValue()).thenReturn(updateGenericAvroSchema.decode(updateBytes));
+        when(updateMessage.getProperties()).thenReturn(updateProperties);
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+                updateObj.toString(),
+                updateMessage.getValue().toString(),
+                updateRecord.getValue().toString());
+
+        jdbcSink.write(updateRecord);
+
+        Thread.sleep(1000);
+
+        // value has been written to db, read it out and verify.
+        String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4";
+        sqliteUtils.select(updateQuerySql, (resultSet) -> {
+            Assert.assertEquals(updateObj.getField1(), resultSet.getString(1));
+            Assert.assertEquals(updateObj.getField2(), resultSet.getString(2));
+            Assert.assertEquals(updateObj.getField3(), resultSet.getInt(3));
+        });
+    }
+
+    @Test
+    public void TestDeleteAction() throws Exception {
+
+        AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Foo deleteObj = new Foo();
+        deleteObj.setField3(5);
+
+        byte[] deleteBytes = schema.encode(deleteObj);
+        Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
+        Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
+                .message(deleteMessage)
+                .topicName("fake_topic_name").ackFunction(new Runnable(){
+                    public void run(){
+                    }
+                })
+                .build();
+
+        GenericSchema<GenericRecord> deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+        Map<String, String> deleteProperties = Maps.newHashMap();
+        deleteProperties.put("ACTION", "DELETE");
+        when(deleteMessage.getValue()).thenReturn(deleteGenericAvroSchema.decode(deleteBytes));
+        when(deleteMessage.getProperties()).thenReturn(deleteProperties);
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+                deleteObj.toString(),
+                deleteMessage.getValue().toString(),
+                deleteRecord.getValue().toString());
+
+        jdbcSink.write(deleteRecord);
+
+        Thread.sleep(1000);
+
+        // value has been written to db, read it out and verify.
+        String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5";
+        Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) -> {}), 0);
     }
 
 }
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
index d58802d..408f477 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -22,9 +22,12 @@ package org.apache.pulsar.io.jdbc;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.List;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
 import org.apache.pulsar.io.jdbc.JdbcUtils.TableId;
+import org.assertj.core.util.Lists;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -75,21 +78,44 @@ public class JdbcUtilsTest {
 
         // Test get getTableDefinition
         log.info("verify getTableDefinition");
-        TableDefinition table = JdbcUtils.getTableDefinition(connection, id);
+        List<String> keyList = Lists.newArrayList();
+        keyList.add("firstName");
+        keyList.add("lastName");
+        List<String> nonKeyList = Lists.newArrayList();
+        nonKeyList.add("age");
+        nonKeyList.add("long");
+        TableDefinition table = JdbcUtils.getTableDefinition(connection, id, keyList, nonKeyList);
         Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
         Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
         Assert.assertEquals(table.getColumns().get(2).getName(), "age");
         Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
         Assert.assertEquals(table.getColumns().get(7).getName(), "float");
         Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC");
-
+        Assert.assertEquals(table.getKeyColumns().get(0).getName(), "firstName");
+        Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), "TEXT");
+        Assert.assertEquals(table.getKeyColumns().get(1).getName(), "lastName");
+        Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), "TEXT");
+        Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
+        Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), "INTEGER");
+        Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
+        Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), "INTEGER");
         // Test get getTableDefinition
         log.info("verify buildInsertSql");
-        String expctedStatement = "INSERT INTO " + tableName +
+        String expctedInsertStatement = "INSERT INTO " + tableName +
             "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
             " VALUES(?,?,?,?,?,?,?,?,?,?)";
-        String statement = JdbcUtils.buildInsertSql(table);
-        Assert.assertEquals(statement, expctedStatement);
+        String insertStatement = JdbcUtils.buildInsertSql(table);
+        Assert.assertEquals(insertStatement, expctedInsertStatement);
+        log.info("verify buildUpdateSql");
+        String expectedUpdateStatement = "UPDATE " + tableName +
+                " SET age=? ,long=?  WHERE firstName=? AND lastName=?";
+        String updateStatement = JdbcUtils.buildUpdateSql(table);
+        Assert.assertEquals(updateStatement, expectedUpdateStatement);
+        log.info("verify buildDeleteSql");
+        String expectedDeleteStatement = "DELETE FROM " + tableName +
+                " WHERE firstName=? AND lastName=?";
+        String deleteStatement = JdbcUtils.buildDeleteSql(table);
+        Assert.assertEquals(deleteStatement, expectedDeleteStatement);
     }
 
 }
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index e5e112d..0b70410 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -27,3 +27,4 @@ Pulsar Functions cluster.
 - [Redis Sink Connector](io-redis.md#sink)
 - [Solr Sink Connector](io-solr.md#sink)
 - [InfluxDB Sink Connector](io-influxdb.md#sink)
+- [JDBC Sink Connector](io-jdbc.md)
diff --git a/site2/docs/io-jdbc.md b/site2/docs/io-jdbc.md
new file mode 100644
index 0000000..1ded39a
--- /dev/null
+++ b/site2/docs/io-jdbc.md
@@ -0,0 +1,23 @@
+---
+id: io-jdbc
+title: JDBC Connector
+sidebar_label: JDBC Connector
+---
+
+## Sink
+
+The JDBC Sink Connector is used to pull messages from Pulsar topics and persist the messages to an MySQL or Sqlite.
+Current support INSERT, DELETE and UPDATE.
+
+### Sink Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| userName | `false` | `` | Username used to connect to the database specified by `jdbcUrl`. |
+| password | `false` | `` | Password used to connect to the database specified by `jdbcUrl`. |
+| jdbcUrl | `true` | `` | The JDBC url of the database this connector connects to. |
+| tableName | `true` | `` | The name of the table this connector writes messages to. |
+| nonKey | `false` | `` | Fields used in update events. A comma-separated list. |
+| key | `false` | `` | Fields used in where condition of update and delete Events. A comma-separated list. |
+| timeoutMs | `false` | `500` | The jdbc operation timeout in milliseconds. |
+| batchSize | `false` | `200` | The batch size of updates made to the database. |
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d7e4404..d96c3a9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.Test;
+import org.testng.collections.Maps;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -174,16 +175,34 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         // produce messages
         Map<String, String> kvs;
         if (tester instanceof JdbcSinkTester) {
-            kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+            kvs = produceSchemaInsertMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+            // wait for sink to process messages
+            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+            // validate the sink result
+            tester.validateSinkResult(kvs);
+
+            kvs = produceSchemaUpdateMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+
+            // wait for sink to process messages
+            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20);
+            // validate the sink result
+            tester.validateSinkResult(kvs);
+
+            kvs = produceSchemaDeleteMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+
+            // wait for sink to process messages
+            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20 + 20);
+            // validate the sink result
+            tester.validateSinkResult(kvs);
+
         } else {
             kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+            // wait for sink to process messages
+            waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+            // validate the sink result
+            tester.validateSinkResult(kvs);
         }
 
-        // wait for sink to process messages
-        waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
-
-        // validate the sink result
-        tester.validateSinkResult(kvs);
 
         // update the sink
         updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
@@ -364,7 +383,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     }
 
     // This for JdbcSinkTester
-    protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
+    protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName,
                                                                     int numMessages,
                                                                     Schema<Foo> schema) throws Exception {
         @Cleanup
@@ -380,16 +399,92 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             String key = "key-" + i;
 
             JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
-            obj.setField1("field1_" + i);
-            obj.setField2("field2_" + i);
+            obj.setField1("field1_insert_" + i);
+            obj.setField2("field2_insert_" + i);
             obj.setField3(i);
             String value = new String(schema.encode(obj));
+            Map<String, String> properties = Maps.newHashMap();
+            properties.put("ACTION", "INSERT");
 
             kvs.put(key, value);
+            kvs.put("ACTION", "INSERT");
             producer.newMessage()
-                .key(key)
-                .value(obj)
-                .send();
+                    .properties(properties)
+                    .key(key)
+                    .value(obj)
+                    .send();
+        }
+        return kvs;
+    }
+
+    // This for JdbcSinkTester
+    protected Map<String, String> produceSchemaUpdateMessagesToInputTopic(String inputTopicName,
+                                                                    int numMessages,
+                                                                    Schema<Foo> schema) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+        @Cleanup
+        Producer<Foo> producer = client.newProducer(schema)
+                .topic(inputTopicName)
+                .create();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+        log.info("update start");
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+
+            JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
+            obj.setField1("field1_insert_" + i);
+            obj.setField2("field2_update_" + i);
+            obj.setField3(i);
+            String value = new String(schema.encode(obj));
+            Map<String, String> properties = Maps.newHashMap();
+            properties.put("ACTION", "UPDATE");
+
+            kvs.put(key, value);
+            kvs.put("ACTION", "UPDATE");
+            producer.newMessage()
+                    .properties(properties)
+                    .key(key)
+                    .value(obj)
+                    .send();
+        }
+        log.info("update end");
+        return kvs;
+    }
+
+    // This for JdbcSinkTester
+    protected Map<String, String> produceSchemaDeleteMessagesToInputTopic(String inputTopicName,
+                                                                          int numMessages,
+                                                                          Schema<Foo> schema) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+        @Cleanup
+        Producer<Foo> producer = client.newProducer(schema)
+                .topic(inputTopicName)
+                .create();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+
+            JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
+            obj.setField1("field1_insert_" + i);
+            obj.setField2("field2_update_" + i);
+            obj.setField3(i);
+            String value = new String(schema.encode(obj));
+            Map<String, String> properties = Maps.newHashMap();
+            properties.put("ACTION", "DELETE");
+
+            kvs.put(key, value);
+            kvs.put("ACTION", "DELETE");
+            producer.newMessage()
+                    .properties(properties)
+                    .key(key)
+                    .value(obj)
+                    .send();
         }
         return kvs;
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index da4331a..9904709 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
 import com.github.dockerjava.api.command.CreateContainerCmd;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -38,6 +35,9 @@ import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.MySQLContainer;
 
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 /**
  * A tester for testing jdbc sink.
  * This will use MySql as DB server
@@ -71,6 +71,8 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
         // container default value is test
         sinkConfig.put("userName", "test");
         sinkConfig.put("password", "test");
+        sinkConfig.put("nonKey", "field2,field3");
+        sinkConfig.put("key", "field1");
         sinkConfig.put("tableName", tableName);
         sinkConfig.put("batchSize", 1);
     }
@@ -127,6 +129,10 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
             PreparedStatement statement = connection.prepareStatement(querySql);
             rs = statement.executeQuery();
 
+            if (kvs.get("ACTION").equals("DELETE")) {
+                assertFalse(rs.first());
+                return;
+            }
             while (rs.next()) {
                 String field1 = rs.getString(1);
                 String field2 = rs.getString(2);