You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:25 UTC

[rocketmq-connect] 17/43: Develop TimestampIncrementingQuerier Mode

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 1376d8206633c494dab351b0b88723be0a6af102
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Thu Aug 15 21:57:54 2019 +0800

    Develop TimestampIncrementingQuerier Mode
---
 .../org/apache/rocketmq/connect/jdbc/Config.java   |   7 +-
 .../jdbc/connector/JdbcSourceConnector.java        |  18 +-
 .../connect/jdbc/connector/JdbcSourceTask.java     | 100 +++++--
 .../rocketmq/connect/jdbc/source/JdbcUtils.java    | 198 +++++++++++++
 .../rocketmq/connect/jdbc/source/Querier.java      |  77 +++--
 .../jdbc/source/TimestampIncrementingQuerier.java  | 315 +++++++++++++++++++++
 .../connect/jdbc/connector/JdbcSourceTaskTest.java |  89 +++++-
 7 files changed, 727 insertions(+), 77 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 533d53b..f93c4db 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -27,8 +27,6 @@ import java.util.List;
 import java.util.Set;
 
 public class Config {
-    @SuppressWarnings("serial")
-
     private static final Logger LOG = LoggerFactory.getLogger(Config.class);
 
     /* Database Connection Config */
@@ -68,11 +66,12 @@ public class Config {
             add("jdbcUrl");
             add("jdbcUsername");
             add("jdbcPassword");
-            //    add("mode");
-            //    add("rocketmqTopic");
+            add("mode");
+            add("rocketmqTopic");
         }
     };
 
+
     public void load(KeyValue props) {
         log.info("Config.load.start");
         properties2Object(props, this);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index bdbeb8b..4a870c0 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -19,17 +19,15 @@ package org.apache.rocketmq.connect.jdbc.connector;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
 
 import org.apache.rocketmq.connect.jdbc.Config;
-import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+
 public class JdbcSourceConnector extends SourceConnector {
     private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class);
     private KeyValue config;
@@ -37,7 +35,7 @@ public class JdbcSourceConnector extends SourceConnector {
     @Override
     public String verifyAndSetConfig(KeyValue config) {
 
-        log.info("JdbcSourceConnector verifyAndSetConfig enter");
+        log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
         for (String requestKey : Config.REQUEST_CONFIG) {
 
             if (!config.containsKey(requestKey)) {
@@ -59,11 +57,13 @@ public class JdbcSourceConnector extends SourceConnector {
 
     }
 
-    @Override public void pause() {
+    @Override
+    public void pause() {
 
     }
 
-    @Override public void resume() {
+    @Override
+    public void resume() {
 
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 91659ec..45252bb 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -1,5 +1,4 @@
 
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,20 +19,31 @@
 package org.apache.rocketmq.connect.jdbc.connector;
 
 import io.openmessaging.connector.api.source.SourceTask;
+
 import java.nio.ByteBuffer;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.connect.jdbc.Config;
 import org.apache.rocketmq.connect.jdbc.schema.Table;
 import org.apache.rocketmq.connect.jdbc.source.Querier;
+import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingQuerier;
 import org.apache.rocketmq.connect.jdbc.schema.column.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.EntryType;
 import io.openmessaging.connector.api.data.Schema;
@@ -49,26 +59,31 @@ public class JdbcSourceTask extends SourceTask {
 
     private Config config;
 
-    private List<Table> list = new LinkedList<>();
-
-    Querier querier = new Querier();
+    BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
+    static final String INCREMENTING_FIELD = "incrementing";
+    static final String TIMESTAMP_FIELD = "timestamp";
+    private Querier querier;
 
     @Override
     public Collection<SourceDataEntry> poll() {
         List<SourceDataEntry> res = new ArrayList<>();
         try {
-
-            JSONObject jsonObject = new JSONObject();
-            jsonObject.put("nextQuery", "database");
-            jsonObject.put("nextPosition", "10");
-            //To be Continued
+            if (tableQueue.size() > 1)
+                querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+            else
+                querier = tableQueue.peek();
+            Timer timer = new java.util.Timer();
+            try {
+                Thread.currentThread();
+                Thread.sleep(1000);//毫秒
+            } catch (Exception e) {
+                throw e;
+            }
             querier.poll();
-            log.info("querier.poll, start");
-			int mm = 0;
             for (Table dataRow : querier.getList()) {
-                System.out.println(dataRow.getColList().get(0));
-                log.info("xunhuankaishi");
-                log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++);
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("nextQuery", "database");
+                jsonObject.put("nextPosition", "table");
                 Schema schema = new Schema();
                 schema.setDataSource(dataRow.getDatabase());
                 schema.setName(dataRow.getName());
@@ -80,37 +95,60 @@ public class JdbcSourceTask extends SourceTask {
                     schema.getFields().add(field);
                 }
                 DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
-                dataEntryBuilder.timestamp(System.currentTimeMillis())
-                    .queue(dataRow.getName())
-                    .entryType(EntryType.CREATE);
+                dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
+                        .entryType(EntryType.CREATE);
                 for (int i = 0; i < dataRow.getColList().size(); i++) {
                     Object value = dataRow.getDataList().get(i);
-                    System.out.println(1);
-                    System.out.println(dataRow.getColList().get(i) + "|" + value);
-                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value));
+                    // System.out.println(dataRow.getColList().get(i) + "|" + value);
+                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), value);
                 }
+
                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                    ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
-                    ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
+                        ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
+                        ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
                 res.add(sourceDataEntry);
+
             }
         } catch (Exception e) {
             log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e);
         }
+        log.info("dataEntry poll successfully,{}", res);
         return res;
     }
 
     @Override
     public void start(KeyValue props) {
         try {
-            this.config = new Config();
-            this.config.load(props);
-            log.info("querier.start");
-            querier.start();
-
+            config = new Config();
+            config.load(props);
         } catch (Exception e) {
-            log.error("JDBC task start failed.", e);
+            log.error("Cannot start Jdbc Source Task because of configuration error{}", e);
+        }
+        Map<Map<String, String>, Map<String, Object>> offsets = null;
+        String mode = config.mode;
+        if (mode.equals("bulk")) {
+            Querier querier = new Querier();
+            try {
+                querier.setConfig(config);
+                querier.start();
+                tableQueue.add(querier);
+            } catch (Exception e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        } else {
+            TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier();
+            try {
+                querier.setConfig(config);
+                querier.start();
+                tableQueue.add(querier);
+            } catch (Exception e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+
         }
+
     }
 
     @Override
@@ -118,11 +156,13 @@ public class JdbcSourceTask extends SourceTask {
         querier.stop();
     }
 
-    @Override public void pause() {
+    @Override
+    public void pause() {
 
     }
 
-    @Override public void resume() {
+    @Override
+    public void resume() {
 
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java
new file mode 100644
index 0000000..cbcca6a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java
@@ -0,0 +1,198 @@
+
+/**
+ * Copyright 2015 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.rocketmq.connect.jdbc.source;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+/**
+ * Utilties for interacting with a JDBC database.
+ */
+public class JdbcUtils {
+
+  private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
+
+  /**
+   * The default table types to include when listing tables if none are specified. Valid values
+   * are those specified by the @{java.sql.DatabaseMetaData#getTables} method's TABLE_TYPE column.
+   * The default only includes standard, user-defined tables.
+   */
+  public static final Set<String> DEFAULT_TABLE_TYPES = Collections.unmodifiableSet(
+      new HashSet<String>(Arrays.asList("TABLE"))
+  );
+
+  private static final int GET_TABLES_TYPE_COLUMN = 4;
+  private static final int GET_TABLES_NAME_COLUMN = 3;
+
+  private static final int GET_COLUMNS_COLUMN_NAME = 4;
+  private static final int GET_COLUMNS_IS_NULLABLE = 18;
+  private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23;
+
+
+  private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+      return sdf;
+    }
+  };
+
+  /**
+   * Get a list of tables in the database. This uses the default filters, which only include
+   * user-defined tables.
+   * @param conn database connection
+   * @return a list of tables
+   * @throws SQLException
+   */
+  public static List<String> getTables(Connection conn) throws SQLException {
+    return getTables(conn, DEFAULT_TABLE_TYPES);
+  }
+
+  /**
+   * Get a list of table names in the database.
+   * @param conn database connection
+   * @param types a set of table types that should be included in the results
+   * @throws SQLException
+   */
+  public static List<String> getTables(Connection conn, Set<String> types) throws SQLException {
+    DatabaseMetaData metadata = conn.getMetaData();
+    ResultSet rs = metadata.getTables(null, null, "%", null);
+    List<String> tableNames = new ArrayList<String>();
+    while (rs.next()) {
+      if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) {
+        String colName = rs.getString(GET_TABLES_NAME_COLUMN);
+        // SQLite JDBC driver does not correctly mark these as system tables
+        if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) {
+          continue;
+        }
+
+        tableNames.add(colName);
+      }
+    }
+    return tableNames;
+  }
+
+  /**
+   * Look up the autoincrement column for the specified table.
+   * @param conn database connection
+   * @param table the table to
+   * @return the name of the column that is an autoincrement column, or null if there is no
+   *         autoincrement column or more than one exists
+   * @throws SQLException
+   */
+  public static String getAutoincrementColumn(Connection conn, String table) throws SQLException {
+    String result = null;
+    int matches = 0;
+
+    ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%");
+    // Some database drivers (SQLite) don't include all the columns
+    if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) {
+      while(rs.next()) {
+        if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) {
+          result = rs.getString(GET_COLUMNS_COLUMN_NAME);
+          matches++;
+        }
+      }
+      return (matches == 1 ? result : null);
+    }
+
+    // Fallback approach is to query for a single row. This unfortunately does not work with any
+    // empty table
+    log.trace("Falling back to SELECT detection of auto-increment column for {}:{}", conn, table);
+    Statement stmt = conn.createStatement();
+    try {
+      String quoteString = getIdentifierQuoteString(conn);
+      rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + quoteString + " LIMIT 1");
+      ResultSetMetaData rsmd = rs.getMetaData();
+      for(int i = 1; i < rsmd.getColumnCount(); i++) {
+        if (rsmd.isAutoIncrement(i)) {
+          result = rsmd.getColumnName(i);
+          matches++;
+        }
+      }
+    } finally {
+      rs.close();
+      stmt.close();
+    }
+    return (matches == 1 ? result : null);
+  }
+
+  public static boolean isColumnNullable(Connection conn, String table, String column)
+      throws SQLException {
+    ResultSet rs = conn.getMetaData().getColumns(null, null, table, column);
+    if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) {
+      // Should only be one match
+      if (!rs.next()) {
+        return false;
+      }
+      String val = rs.getString(GET_COLUMNS_IS_NULLABLE);
+      return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES");
+    }
+
+    return false;
+  }
+
+  /**
+   * Format the given Date assuming UTC timezone in a format supported by SQL.
+   * @param date the date to convert to a String
+   * @return the formatted string
+   */
+  public static String formatUTC(Date date) {
+    return DATE_FORMATTER.get().format(date);
+  }
+
+  /**
+   * Get the string used for quoting identifiers in this database's SQL dialect.
+   * @param connection the database connection
+   * @return the quote string
+   * @throws SQLException
+   */
+  public static String getIdentifierQuoteString(Connection connection) throws SQLException {
+    String quoteString = connection.getMetaData().getIdentifierQuoteString();
+    quoteString = quoteString == null ? "" : quoteString;
+    return quoteString;
+  }
+
+  /**
+   * Quote the given string.
+   * @param orig the string to quote
+   * @param quote the quote character
+   * @return the quoted string
+   */
+  public static String quoteString(String orig, String quote) {
+    return quote + orig + quote;
+  }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 073a896..0907d40 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -13,27 +13,55 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-
 import javax.sql.DataSource;
-
 import org.apache.rocketmq.connect.jdbc.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.alibaba.druid.pool.DruidDataSourceFactory;
-
 import org.apache.rocketmq.connect.jdbc.schema.*;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
 
 public class Querier {
-    static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+
     private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
     protected String topicPrefix;
     protected String jdbcUrl;
     private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
-    private Config config = new Config();
+    private Config config;
+
+    /**
+     * @return the config
+     */
+    public Config getConfig() {
+        return config;
+    }
+
+    public void setConfig(Config config) {
+        this.config = config;
+        log.info("config load successfully");
+    }
+
     private DataSource dataSource;
     private List<Table> list = new LinkedList<>();
+    private String mode;
+
+
+    public DataSource getDataSource() {
+        return dataSource;
+    }
+
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+
 
     public List<Table> getList() {
         return list;
@@ -44,7 +72,6 @@ public class Querier {
     }
 
     public Connection getConnection() throws SQLException {
-
         // These config names are the same for both source and sink configs ...
         String username = config.jdbcUsername;
         String dbPassword = config.jdbcPassword;
@@ -76,7 +103,7 @@ public class Querier {
     protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
 
         String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
-            + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
+                + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
 
         log.trace("Creating a PreparedStatement '{}'", SQL);
         PreparedStatement stmt = db.prepareStatement(SQL);
@@ -96,30 +123,29 @@ public class Querier {
         return stmt.executeQuery();
     }
 
+    private Schema schema;
+
     public static void main(String[] args) throws Exception {
-        Querier querier = new Querier();
+        TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier();
         try {
             querier.start();
             querier.poll();
-
         } catch (SQLException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
-
     }
 
-    private Schema schema;
-
     public void poll() {
         try {
 
             PreparedStatement stmt;
             String query = "select * from ";
             Connection conn = dataSource.getConnection();
-
             for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
                 String db = entry.getKey();
+                if (!db.contains("jdbc_db"))
+                    continue;
                 Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
                 while (iterator.hasNext()) {
                     Map.Entry<String, Table> tableEntry = iterator.next();
@@ -155,7 +181,13 @@ public class Querier {
     }
 
     public void start() throws Exception {
-        initDataSource();
+        try {
+
+            log.info("datasorce success");
+            initDataSource();
+        } catch (Throwable exception) {
+            log.info("error,{}", exception);
+        }
         schema = new Schema(dataSource);
         schema.load();
         log.info("schema load successful");
@@ -163,9 +195,10 @@ public class Querier {
 
     private void initDataSource() throws Exception {
         Map<String, String> map = new HashMap<>();
+
         map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
         map.put("url",
-            "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+                "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
         map.put("username", config.jdbcUsername);
         map.put("password", config.jdbcPassword);
         map.put("initialSize", "2");
@@ -175,9 +208,15 @@ public class Querier {
         map.put("minEvictableIdleTimeMillis", "300000");
         map.put("validationQuery", "SELECT 1 FROM DUAL");
         map.put("testWhileIdle", "true");
-        log.info("{},config read successful", map);
-        dataSource = DruidDataSourceFactory.createDataSource(map);
-
+        log.info("{} config read successful", map);
+        try {
+            dataSource = DruidDataSourceFactory.createDataSource(map);
+        } catch (Exception exception) {
+            log.info("exeception,{}", exception);
+        } catch (Error e) {
+            log.info("error,{},e", e);
+        }
+        log.info("datasorce success");
     }
 
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
new file mode 100644
index 0000000..1dadc4f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
@@ -0,0 +1,315 @@
+package org.apache.rocketmq.connect.jdbc.source;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.schema.Database;
+import org.apache.rocketmq.connect.jdbc.schema.Schema;
+import org.apache.rocketmq.connect.jdbc.schema.Table;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+
+
+public class TimestampIncrementingQuerier extends Querier {
+    protected PreparedStatement stmt;
+    static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+    protected String jdbcUrl;
+
+    private Config config;
+    private DataSource dataSource;
+    private static final Logger log = LoggerFactory.getLogger(TimestampIncrementingQuerier.class);
+    private List<Table> list = new LinkedList<>();
+    private HashMap<String, Long> incrementingMap;
+    private HashMap<String, Timestamp> timestampMap;
+    private static final Calendar UTC_CALENDAR = new GregorianCalendar(TimeZone.getTimeZone("UTC+8"));
+    private String timestampColumn = "";
+    static final String INCREMENTING_FIELD = "incrementing";
+    static final String TIMESTAMP_FIELD = "timestamp";
+    private Map<String, Long> offset;
+    private Long timestampOffset;
+    private String incrementingColumn = "";
+    private Map<String, String> partition;
+    private Schema schema;
+
+    public String getTimestampColumn() {
+        return timestampColumn;
+    }
+
+    public void setTimestampColumn(String timestampColumn) {
+        this.timestampColumn = timestampColumn;
+    }
+
+    public String getIncrementingColumn() {
+        return incrementingColumn;
+    }
+
+    public void setIncrementingColumn(String incrementingColumn) {
+        this.incrementingColumn = incrementingColumn;
+    }
+
+    private Long incrementingOffset = null;
+    private String name;
+
+    public void extractRecord(String name) throws SQLException {
+        if (incrementingColumn != null) {
+            log.info("{}", name);
+            incrementingMap.put(name, incrementingOffset);
+        }
+        if (timestampColumn != null) {
+            timestampMap.put(name, new Timestamp(timestampOffset));
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public void storeRecord(String name) throws SQLException {
+        offset = new HashMap<>();
+        if (incrementingColumn != null) {
+            Long id = 0L;
+
+            if (incrementingMap.containsKey(name)) {
+                id = incrementingMap.get(name);
+                System.out.println("read incrementingMap" + id);
+            }
+            assert (incrementingOffset == null || id > incrementingOffset) || timestampColumn != null;
+            incrementingOffset = id;
+            offset.put(INCREMENTING_FIELD, id);
+        }
+        if (timestampColumn != null) {
+            Timestamp timestamp = new Timestamp(0);
+            if (timestampMap.containsKey(name))
+                timestamp = timestampMap.get(name);
+
+            System.out.println("read timestampColumn" + timestamp.toString());
+            timestampOffset = timestamp.getTimezoneOffset() + timestamp.getTime();
+            System.out.println("read" + new Timestamp(timestampOffset));
+            offset.put(TIMESTAMP_FIELD, timestampOffset);
+        }
+        log.info("{}store", new Timestamp(timestampOffset));
+        partition = Collections.singletonMap("table", name);
+    }
+
+    protected void createPreparedStatement(Connection conn) throws SQLException {
+        // Default when unspecified uses an autoincrementing column
+        if (incrementingColumn != null && incrementingColumn.isEmpty()) {
+            incrementingColumn = JdbcUtils.getAutoincrementColumn(conn, name);
+        }
+
+        String quoteString = conn.getMetaData().getIdentifierQuoteString();
+        StringBuilder builder = new StringBuilder();
+        builder.append("SELECT * FROM ");
+        builder.append(name);
+
+        quoteString = quoteString == null ? "" : quoteString;
+
+        if (incrementingColumn != null && timestampColumn != null) {
+            // This version combines two possible conditions. The first checks timestamp ==
+            // last
+            // timestamp and incrementing > last incrementing. The timestamp alone would
+            // include
+            // duplicates, but adding the incrementing condition ensures no duplicates, e.g.
+            // you would
+            // get only the row with id = 23:
+            // timestamp 1234, id 22 <- last
+            // timestamp 1234, id 23
+            // The second check only uses the timestamp >= last timestamp. This covers
+            // everything new,
+            // even if it is an update of the existing row. If we previously had:
+            // timestamp 1234, id 22 <- last
+            // and then these rows were written:
+            // timestamp 1235, id 22
+            // timestamp 1236, id 23
+            // We should capture both id = 22 (an update) and id = 23 (a new row)
+            String timeString = quoteString + timestampColumn + quoteString;
+            String incrString = quoteString + incrementingColumn + quoteString;
+            builder.append(" WHERE ");
+            builder.append(timeString);
+            builder.append(" < CURRENT_TIMESTAMP AND ((");
+            builder.append(timeString);
+            builder.append(" = ? AND ");
+            builder.append(incrString);
+            builder.append(" > ?");
+            builder.append(") OR ");
+            builder.append(timeString);
+            builder.append(" > ?)");
+            builder.append(" ORDER BY ");
+            builder.append(timeString);
+            builder.append(",");
+            builder.append(incrString);
+            builder.append(" ASC");
+
+        } else if (incrementingColumn != null) {
+            String incrString = quoteString + incrementingColumn + quoteString;
+            builder.append(" WHERE ");
+            builder.append(incrString);
+            builder.append(" > ?");
+            builder.append(" ORDER BY ");
+            builder.append(incrString);
+            builder.append(" ASC");
+        } else if (timestampColumn != null) {
+            String timeString = quoteString + timestampColumn + quoteString;
+            builder.append(" WHERE ");
+            builder.append(timeString);
+            builder.append(" > ? AND ");
+            builder.append(timeString);
+            builder.append(" < CURRENT_TIMESTAMP ORDER BY ");
+            builder.append(timeString);
+            builder.append(" ASC");
+        }
+        String queryString = builder.toString();
+        stmt = conn.prepareStatement(queryString);
+        log.info(queryString);
+    }
+
+    public static void main(String[] args) throws Exception {
+        TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier();
+        try {
+            querier.start();
+            querier.poll();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected ResultSet executeQuery() throws SQLException {
+        if (incrementingColumn != null && timestampColumn != null) {
+            Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset);
+            stmt.setTimestamp(1, ts);
+            stmt.setLong(2, (incrementingOffset == null ? -1 : incrementingOffset));
+            stmt.setTimestamp(3, ts);
+        } else if (incrementingColumn != null) {
+            stmt.setLong(1, (incrementingOffset == null ? -1 : incrementingOffset));
+        } else if (timestampColumn != null) {
+            Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset);
+            stmt.setTimestamp(1, ts);
+        }
+        log.info("{}·", stmt);
+        log.info("{},{}", incrementingOffset, timestampOffset);
+        return stmt.executeQuery();
+    }
+
+    public List<Table> getList() {
+        return list;
+    }
+
+    public void setList(List<Table> list) {
+        this.list = list;
+    }
+
+    public void poll() {
+        try {
+            list.clear();
+            Connection conn = dataSource.getConnection();
+            for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+                String db = entry.getKey();
+                if (!db.contains("time_db"))
+                    continue;
+                log.info("{} database is loading", db);
+                Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, Table> tableEntry = iterator.next();
+                    String tb = tableEntry.getKey();
+                    log.info("{} table is loading", tb);
+                    name = db + "." + tb;
+                    storeRecord(name);
+                    createPreparedStatement(conn);
+                    ResultSet rs;
+                    rs = executeQuery();
+                    List<String> colList = tableEntry.getValue().getColList();
+                    List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
+                    List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
+
+                    while (rs.next()) {
+                        Table table = new Table(db, tb);
+                        System.out.print("|");
+                        table.setColList(colList);
+                        table.setRawDataTypeList(DataTypeList);
+                        table.setParserList(ParserList);
+                        for (String string : colList) {
+                            table.getDataList().add(rs.getObject(string));
+                            System.out.print(string + " : " + rs.getObject(string) + "|");
+                        }
+                        incrementingOffset = incrementingOffset > rs.getInt(incrementingColumn) ? incrementingOffset
+                                : rs.getInt(incrementingColumn);
+                        timestampOffset = timestampOffset > rs.getTimestamp(timestampColumn).getTime() ? timestampOffset
+                                : rs.getTimestamp(timestampColumn).getTime();
+                        System.out.println(timestampOffset);
+                        list.add(table);
+                        System.out.println();
+                    }
+                    extractRecord(name);
+                    incrementingOffset = 0L;
+                    timestampOffset = 0L;
+                }
+            }
+            conn.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void start() throws Exception {
+        try {
+            initDataSource();
+            if (incrementingColumn != null && timestampColumn != null) {
+                incrementingMap = new HashMap<>();
+                timestampMap = new HashMap<>();
+            } else if (incrementingColumn != null) {
+                incrementingMap = new HashMap<>();
+            } else if (timestampColumn != null) {
+                timestampMap = new HashMap<>();
+            }
+        } catch (Throwable exception) {
+            log.info("error,{}", exception);
+        }
+        schema = new Schema(dataSource);
+        schema.load();
+    }
+
+    private void initDataSource() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        config = super.getConfig();
+        timestampColumn = config.getTimestampColmnName();
+        incrementingColumn = config.getIncrementingColumnName();
+        map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+        map.put("url",
+                "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+        map.put("username", config.jdbcUsername);
+        map.put("password", config.jdbcPassword);
+        map.put("initialSize", "2");
+        map.put("maxActive", "2");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+        log.info("{}config read successfully", map);
+        try {
+            dataSource = DruidDataSourceFactory.createDataSource(map);
+        } catch (Exception exception) {
+            log.info("exeception,{}", exception);
+        } catch (Error error) {
+            log.info("error,{}", error);
+        }
+        log.info("datasorce success");
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
index f9c8c6f..429494b 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
@@ -16,29 +16,88 @@
  */
 
 package org.apache.rocketmq.connect.jdbc.connector;
+
 import java.util.Collection;
-import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
 
+import org.junit.Test;
+import java.sql.*;
+import com.alibaba.druid.pool.DruidDataSourceFactory;
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.internal.DefaultKeyValue;
 
 public class JdbcSourceTaskTest {
+	KeyValue kv;
+	DataSource dataSource;
+
+	@Test
+	public void testBulk() throws InterruptedException {
+		KeyValue kv = new DefaultKeyValue();
+		kv.put("jdbcUrl", "localhost:3306");
+		kv.put("jdbcUsername", "root");
+		kv.put("jdbcPassword", "199812160");
+		kv.put("mode", "bulk");
+		kv.put("rocketmqTopic", "JdbcTopic");
+		JdbcSourceTask task = new JdbcSourceTask();
+		task.start(kv);
+		Collection<SourceDataEntry> sourceDataEntry = task.poll();
+		System.out.println(sourceDataEntry);
+	}
+
+	@Test
+	public void testTimestampIncrementing() throws InterruptedException, SQLException {
+		kv = new DefaultKeyValue();
+		kv.put("jdbcUrl", "localhost:3306");
+		kv.put("jdbcUsername", "root");
+		kv.put("jdbcPassword", "199812160");
+		kv.put("incrementingColumnName", "id");
+		kv.put("timestampColmnName", "timestamp");
+		kv.put("mode", "incrementing+timestamp");
+		kv.put("rocketmqTopic", "JdbcTopic");
+		JdbcSourceTask task = new JdbcSourceTask();
+		task.start(kv);
+		Collection<SourceDataEntry> sourceDataEntry = task.poll();
+		System.out.println(sourceDataEntry);
+		Map<String, String> map = new HashMap<>();
+		map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+		map.put("url", "jdbc:mysql://" + kv.getString("jdbcUrl")
+				+ "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+		map.put("username", kv.getString("jdbcUsername"));
+		map.put("password", kv.getString("jdbcPassword"));
+		map.put("initialSize", "2");
+		map.put("maxActive", "2");
+		map.put("maxWait", "60000");
+		map.put("timeBetweenEvictionRunsMillis", "60000");
+		map.put("minEvictableIdleTimeMillis", "300000");
+		map.put("validationQuery", "SELECT 1 FROM DUAL");
+		map.put("testWhileIdle", "true");
+		try {
+			dataSource = DruidDataSourceFactory.createDataSource(map);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
 
+		Connection connection= dataSource.getConnection();
+		PreparedStatement statement;
+		String s="insert into time_db.timestamp_tb (name) values(\"test\")";
+		statement=connection.prepareStatement(s);
+		statement.executeUpdate();
 
-    @Test
-    public void test() throws InterruptedException {
-        KeyValue kv = new DefaultKeyValue();
-        kv.put("jdbcUrl","localhost:3306");
-        kv.put("jdbcUsername","root");
-        kv.put("jdbcPassword","199812160");
-        kv.put("mode","bulk");
-        kv.put("rocketmqTopic","JdbcTopic");
-        JdbcSourceTask task = new JdbcSourceTask();
-        task.start(kv);
-            Collection<SourceDataEntry> sourceDataEntry = task.poll();
-            System.out.println(sourceDataEntry);
-
-    }
+		sourceDataEntry = task.poll();
+		System.out.println(sourceDataEntry);
+		s="update time_db.timestamp_tb set name=\"liu\" where id < 2";
+		statement=connection.prepareStatement(s);
+		statement.executeUpdate();
+		sourceDataEntry = task.poll();
+		System.out.println(sourceDataEntry);
+		task.stop();
+		
+		connection.close();
+	}
 }