You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:25 UTC
[rocketmq-connect] 17/43: Develop TimestampIncrementingQuerier Mode
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 1376d8206633c494dab351b0b88723be0a6af102
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Thu Aug 15 21:57:54 2019 +0800
Develop TimestampIncrementingQuerier Mode
---
.../org/apache/rocketmq/connect/jdbc/Config.java | 7 +-
.../jdbc/connector/JdbcSourceConnector.java | 18 +-
.../connect/jdbc/connector/JdbcSourceTask.java | 100 +++++--
.../rocketmq/connect/jdbc/source/JdbcUtils.java | 198 +++++++++++++
.../rocketmq/connect/jdbc/source/Querier.java | 77 +++--
.../jdbc/source/TimestampIncrementingQuerier.java | 315 +++++++++++++++++++++
.../connect/jdbc/connector/JdbcSourceTaskTest.java | 89 +++++-
7 files changed, 727 insertions(+), 77 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 533d53b..f93c4db 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -27,8 +27,6 @@ import java.util.List;
import java.util.Set;
public class Config {
- @SuppressWarnings("serial")
-
private static final Logger LOG = LoggerFactory.getLogger(Config.class);
/* Database Connection Config */
@@ -68,11 +66,12 @@ public class Config {
add("jdbcUrl");
add("jdbcUsername");
add("jdbcPassword");
- // add("mode");
- // add("rocketmqTopic");
+ add("mode");
+ add("rocketmqTopic");
}
};
+
public void load(KeyValue props) {
log.info("Config.load.start");
properties2Object(props, this);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index bdbeb8b..4a870c0 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -19,17 +19,15 @@ package org.apache.rocketmq.connect.jdbc.connector;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
-
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
import org.apache.rocketmq.connect.jdbc.Config;
-import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+
public class JdbcSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class);
private KeyValue config;
@@ -37,7 +35,7 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public String verifyAndSetConfig(KeyValue config) {
- log.info("JdbcSourceConnector verifyAndSetConfig enter");
+ log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
for (String requestKey : Config.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
@@ -59,11 +57,13 @@ public class JdbcSourceConnector extends SourceConnector {
}
- @Override public void pause() {
+ @Override
+ public void pause() {
}
- @Override public void resume() {
+ @Override
+ public void resume() {
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 91659ec..45252bb 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -1,5 +1,4 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -20,20 +19,31 @@
package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.connector.api.source.SourceTask;
+
import java.nio.ByteBuffer;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.connect.jdbc.Config;
import org.apache.rocketmq.connect.jdbc.schema.Table;
import org.apache.rocketmq.connect.jdbc.source.Querier;
+import org.apache.rocketmq.connect.jdbc.source.TimestampIncrementingQuerier;
import org.apache.rocketmq.connect.jdbc.schema.column.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
+
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.EntryType;
import io.openmessaging.connector.api.data.Schema;
@@ -49,26 +59,31 @@ public class JdbcSourceTask extends SourceTask {
private Config config;
- private List<Table> list = new LinkedList<>();
-
- Querier querier = new Querier();
+ BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
+ static final String INCREMENTING_FIELD = "incrementing";
+ static final String TIMESTAMP_FIELD = "timestamp";
+ private Querier querier;
@Override
public Collection<SourceDataEntry> poll() {
List<SourceDataEntry> res = new ArrayList<>();
try {
-
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("nextQuery", "database");
- jsonObject.put("nextPosition", "10");
- //To be Continued
+ if (tableQueue.size() > 1)
+ querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+ else
+ querier = tableQueue.peek();
+ Timer timer = new java.util.Timer();
+ try {
+ Thread.currentThread();
+ Thread.sleep(1000);//毫秒
+ } catch (Exception e) {
+ throw e;
+ }
querier.poll();
- log.info("querier.poll, start");
- int mm = 0;
for (Table dataRow : querier.getList()) {
- System.out.println(dataRow.getColList().get(0));
- log.info("xunhuankaishi");
- log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++);
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("nextQuery", "database");
+ jsonObject.put("nextPosition", "table");
Schema schema = new Schema();
schema.setDataSource(dataRow.getDatabase());
schema.setName(dataRow.getName());
@@ -80,37 +95,60 @@ public class JdbcSourceTask extends SourceTask {
schema.getFields().add(field);
}
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(dataRow.getName())
- .entryType(EntryType.CREATE);
+ dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
+ .entryType(EntryType.CREATE);
for (int i = 0; i < dataRow.getColList().size(); i++) {
Object value = dataRow.getDataList().get(i);
- System.out.println(1);
- System.out.println(dataRow.getColList().get(i) + "|" + value);
- dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value));
+ // System.out.println(dataRow.getColList().get(i) + "|" + value);
+ dataEntryBuilder.putFiled(dataRow.getColList().get(i), value);
}
+
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
- ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
+ ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
res.add(sourceDataEntry);
+
}
} catch (Exception e) {
log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e);
}
+ log.info("dataEntry poll successfully,{}", res);
return res;
}
@Override
public void start(KeyValue props) {
try {
- this.config = new Config();
- this.config.load(props);
- log.info("querier.start");
- querier.start();
-
+ config = new Config();
+ config.load(props);
} catch (Exception e) {
- log.error("JDBC task start failed.", e);
+ log.error("Cannot start Jdbc Source Task because of configuration error{}", e);
+ }
+ Map<Map<String, String>, Map<String, Object>> offsets = null;
+ String mode = config.mode;
+ if (mode.equals("bulk")) {
+ Querier querier = new Querier();
+ try {
+ querier.setConfig(config);
+ querier.start();
+ tableQueue.add(querier);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ } else {
+ TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier();
+ try {
+ querier.setConfig(config);
+ querier.start();
+ tableQueue.add(querier);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
}
+
}
@Override
@@ -118,11 +156,13 @@ public class JdbcSourceTask extends SourceTask {
querier.stop();
}
- @Override public void pause() {
+ @Override
+ public void pause() {
}
- @Override public void resume() {
+ @Override
+ public void resume() {
}
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java
new file mode 100644
index 0000000..cbcca6a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/JdbcUtils.java
@@ -0,0 +1,198 @@
+
+/**
+ * Copyright 2015 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.rocketmq.connect.jdbc.source;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+/**
+ * Utilties for interacting with a JDBC database.
+ */
+public class JdbcUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
+
+ /**
+ * The default table types to include when listing tables if none are specified. Valid values
+ * are those specified by the @{java.sql.DatabaseMetaData#getTables} method's TABLE_TYPE column.
+ * The default only includes standard, user-defined tables.
+ */
+ public static final Set<String> DEFAULT_TABLE_TYPES = Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList("TABLE"))
+ );
+
+ private static final int GET_TABLES_TYPE_COLUMN = 4;
+ private static final int GET_TABLES_NAME_COLUMN = 3;
+
+ private static final int GET_COLUMNS_COLUMN_NAME = 4;
+ private static final int GET_COLUMNS_IS_NULLABLE = 18;
+ private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23;
+
+
+ private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return sdf;
+ }
+ };
+
+ /**
+ * Get a list of tables in the database. This uses the default filters, which only include
+ * user-defined tables.
+ * @param conn database connection
+ * @return a list of tables
+ * @throws SQLException
+ */
+ public static List<String> getTables(Connection conn) throws SQLException {
+ return getTables(conn, DEFAULT_TABLE_TYPES);
+ }
+
+ /**
+ * Get a list of table names in the database.
+ * @param conn database connection
+ * @param types a set of table types that should be included in the results
+ * @throws SQLException
+ */
+ public static List<String> getTables(Connection conn, Set<String> types) throws SQLException {
+ DatabaseMetaData metadata = conn.getMetaData();
+ ResultSet rs = metadata.getTables(null, null, "%", null);
+ List<String> tableNames = new ArrayList<String>();
+ while (rs.next()) {
+ if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) {
+ String colName = rs.getString(GET_TABLES_NAME_COLUMN);
+ // SQLite JDBC driver does not correctly mark these as system tables
+ if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) {
+ continue;
+ }
+
+ tableNames.add(colName);
+ }
+ }
+ return tableNames;
+ }
+
+ /**
+ * Look up the autoincrement column for the specified table.
+ * @param conn database connection
+ * @param table the table to
+ * @return the name of the column that is an autoincrement column, or null if there is no
+ * autoincrement column or more than one exists
+ * @throws SQLException
+ */
+ public static String getAutoincrementColumn(Connection conn, String table) throws SQLException {
+ String result = null;
+ int matches = 0;
+
+ ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%");
+ // Some database drivers (SQLite) don't include all the columns
+ if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) {
+ while(rs.next()) {
+ if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) {
+ result = rs.getString(GET_COLUMNS_COLUMN_NAME);
+ matches++;
+ }
+ }
+ return (matches == 1 ? result : null);
+ }
+
+ // Fallback approach is to query for a single row. This unfortunately does not work with any
+ // empty table
+ log.trace("Falling back to SELECT detection of auto-increment column for {}:{}", conn, table);
+ Statement stmt = conn.createStatement();
+ try {
+ String quoteString = getIdentifierQuoteString(conn);
+ rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + quoteString + " LIMIT 1");
+ ResultSetMetaData rsmd = rs.getMetaData();
+ for(int i = 1; i < rsmd.getColumnCount(); i++) {
+ if (rsmd.isAutoIncrement(i)) {
+ result = rsmd.getColumnName(i);
+ matches++;
+ }
+ }
+ } finally {
+ rs.close();
+ stmt.close();
+ }
+ return (matches == 1 ? result : null);
+ }
+
+ public static boolean isColumnNullable(Connection conn, String table, String column)
+ throws SQLException {
+ ResultSet rs = conn.getMetaData().getColumns(null, null, table, column);
+ if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) {
+ // Should only be one match
+ if (!rs.next()) {
+ return false;
+ }
+ String val = rs.getString(GET_COLUMNS_IS_NULLABLE);
+ return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES");
+ }
+
+ return false;
+ }
+
+ /**
+ * Format the given Date assuming UTC timezone in a format supported by SQL.
+ * @param date the date to convert to a String
+ * @return the formatted string
+ */
+ public static String formatUTC(Date date) {
+ return DATE_FORMATTER.get().format(date);
+ }
+
+ /**
+ * Get the string used for quoting identifiers in this database's SQL dialect.
+ * @param connection the database connection
+ * @return the quote string
+ * @throws SQLException
+ */
+ public static String getIdentifierQuoteString(Connection connection) throws SQLException {
+ String quoteString = connection.getMetaData().getIdentifierQuoteString();
+ quoteString = quoteString == null ? "" : quoteString;
+ return quoteString;
+ }
+
+ /**
+ * Quote the given string.
+ * @param orig the string to quote
+ * @param quote the quote character
+ * @return the quoted string
+ */
+ public static String quoteString(String orig, String quote) {
+ return quote + orig + quote;
+ }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 073a896..0907d40 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -13,27 +13,55 @@ import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-
import javax.sql.DataSource;
-
import org.apache.rocketmq.connect.jdbc.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.alibaba.druid.pool.DruidDataSourceFactory;
-
import org.apache.rocketmq.connect.jdbc.schema.*;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
public class Querier {
- static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+
private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
protected String topicPrefix;
protected String jdbcUrl;
private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
- private Config config = new Config();
+ private Config config;
+
+ /**
+ * @return the config
+ */
+ public Config getConfig() {
+ return config;
+ }
+
+ public void setConfig(Config config) {
+ this.config = config;
+ log.info("config load successfully");
+ }
+
private DataSource dataSource;
private List<Table> list = new LinkedList<>();
+ private String mode;
+
+
+ public DataSource getDataSource() {
+ return dataSource;
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+
public List<Table> getList() {
return list;
@@ -44,7 +72,6 @@ public class Querier {
}
public Connection getConnection() throws SQLException {
-
// These config names are the same for both source and sink configs ...
String username = config.jdbcUsername;
String dbPassword = config.jdbcPassword;
@@ -76,7 +103,7 @@ public class Querier {
protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
- + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
+ + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
log.trace("Creating a PreparedStatement '{}'", SQL);
PreparedStatement stmt = db.prepareStatement(SQL);
@@ -96,30 +123,29 @@ public class Querier {
return stmt.executeQuery();
}
+ private Schema schema;
+
public static void main(String[] args) throws Exception {
- Querier querier = new Querier();
+ TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier();
try {
querier.start();
querier.poll();
-
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
-
}
- private Schema schema;
-
public void poll() {
try {
PreparedStatement stmt;
String query = "select * from ";
Connection conn = dataSource.getConnection();
-
for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
String db = entry.getKey();
+ if (!db.contains("jdbc_db"))
+ continue;
Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Table> tableEntry = iterator.next();
@@ -155,7 +181,13 @@ public class Querier {
}
public void start() throws Exception {
- initDataSource();
+ try {
+
+ log.info("datasorce success");
+ initDataSource();
+ } catch (Throwable exception) {
+ log.info("error,{}", exception);
+ }
schema = new Schema(dataSource);
schema.load();
log.info("schema load successful");
@@ -163,9 +195,10 @@ public class Querier {
private void initDataSource() throws Exception {
Map<String, String> map = new HashMap<>();
+
map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
map.put("url",
- "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+ "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
map.put("username", config.jdbcUsername);
map.put("password", config.jdbcPassword);
map.put("initialSize", "2");
@@ -175,9 +208,15 @@ public class Querier {
map.put("minEvictableIdleTimeMillis", "300000");
map.put("validationQuery", "SELECT 1 FROM DUAL");
map.put("testWhileIdle", "true");
- log.info("{},config read successful", map);
- dataSource = DruidDataSourceFactory.createDataSource(map);
-
+ log.info("{} config read successful", map);
+ try {
+ dataSource = DruidDataSourceFactory.createDataSource(map);
+ } catch (Exception exception) {
+ log.info("exeception,{}", exception);
+ } catch (Error e) {
+ log.info("error,{},e", e);
+ }
+ log.info("datasorce success");
}
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
new file mode 100644
index 0000000..1dadc4f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java
@@ -0,0 +1,315 @@
+package org.apache.rocketmq.connect.jdbc.source;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.schema.Database;
+import org.apache.rocketmq.connect.jdbc.schema.Schema;
+import org.apache.rocketmq.connect.jdbc.schema.Table;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+
+
+public class TimestampIncrementingQuerier extends Querier {
+ protected PreparedStatement stmt;
+ static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+ protected String jdbcUrl;
+
+ private Config config;
+ private DataSource dataSource;
+ private static final Logger log = LoggerFactory.getLogger(TimestampIncrementingQuerier.class);
+ private List<Table> list = new LinkedList<>();
+ private HashMap<String, Long> incrementingMap;
+ private HashMap<String, Timestamp> timestampMap;
+ private static final Calendar UTC_CALENDAR = new GregorianCalendar(TimeZone.getTimeZone("UTC+8"));
+ private String timestampColumn = "";
+ static final String INCREMENTING_FIELD = "incrementing";
+ static final String TIMESTAMP_FIELD = "timestamp";
+ private Map<String, Long> offset;
+ private Long timestampOffset;
+ private String incrementingColumn = "";
+ private Map<String, String> partition;
+ private Schema schema;
+
+ public String getTimestampColumn() {
+ return timestampColumn;
+ }
+
+ public void setTimestampColumn(String timestampColumn) {
+ this.timestampColumn = timestampColumn;
+ }
+
+ public String getIncrementingColumn() {
+ return incrementingColumn;
+ }
+
+ public void setIncrementingColumn(String incrementingColumn) {
+ this.incrementingColumn = incrementingColumn;
+ }
+
+ private Long incrementingOffset = null;
+ private String name;
+
+ public void extractRecord(String name) throws SQLException {
+ if (incrementingColumn != null) {
+ log.info("{}", name);
+ incrementingMap.put(name, incrementingOffset);
+ }
+ if (timestampColumn != null) {
+ timestampMap.put(name, new Timestamp(timestampOffset));
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public void storeRecord(String name) throws SQLException {
+ offset = new HashMap<>();
+ if (incrementingColumn != null) {
+ Long id = 0L;
+
+ if (incrementingMap.containsKey(name)) {
+ id = incrementingMap.get(name);
+ System.out.println("read incrementingMap" + id);
+ }
+ assert (incrementingOffset == null || id > incrementingOffset) || timestampColumn != null;
+ incrementingOffset = id;
+ offset.put(INCREMENTING_FIELD, id);
+ }
+ if (timestampColumn != null) {
+ Timestamp timestamp = new Timestamp(0);
+ if (timestampMap.containsKey(name))
+ timestamp = timestampMap.get(name);
+
+ System.out.println("read timestampColumn" + timestamp.toString());
+ timestampOffset = timestamp.getTimezoneOffset() + timestamp.getTime();
+ System.out.println("read" + new Timestamp(timestampOffset));
+ offset.put(TIMESTAMP_FIELD, timestampOffset);
+ }
+ log.info("{}store", new Timestamp(timestampOffset));
+ partition = Collections.singletonMap("table", name);
+ }
+
+ protected void createPreparedStatement(Connection conn) throws SQLException {
+ // Default when unspecified uses an autoincrementing column
+ if (incrementingColumn != null && incrementingColumn.isEmpty()) {
+ incrementingColumn = JdbcUtils.getAutoincrementColumn(conn, name);
+ }
+
+ String quoteString = conn.getMetaData().getIdentifierQuoteString();
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT * FROM ");
+ builder.append(name);
+
+ quoteString = quoteString == null ? "" : quoteString;
+
+ if (incrementingColumn != null && timestampColumn != null) {
+ // This version combines two possible conditions. The first checks timestamp ==
+ // last
+ // timestamp and incrementing > last incrementing. The timestamp alone would
+ // include
+ // duplicates, but adding the incrementing condition ensures no duplicates, e.g.
+ // you would
+ // get only the row with id = 23:
+ // timestamp 1234, id 22 <- last
+ // timestamp 1234, id 23
+ // The second check only uses the timestamp >= last timestamp. This covers
+ // everything new,
+ // even if it is an update of the existing row. If we previously had:
+ // timestamp 1234, id 22 <- last
+ // and then these rows were written:
+ // timestamp 1235, id 22
+ // timestamp 1236, id 23
+ // We should capture both id = 22 (an update) and id = 23 (a new row)
+ String timeString = quoteString + timestampColumn + quoteString;
+ String incrString = quoteString + incrementingColumn + quoteString;
+ builder.append(" WHERE ");
+ builder.append(timeString);
+ builder.append(" < CURRENT_TIMESTAMP AND ((");
+ builder.append(timeString);
+ builder.append(" = ? AND ");
+ builder.append(incrString);
+ builder.append(" > ?");
+ builder.append(") OR ");
+ builder.append(timeString);
+ builder.append(" > ?)");
+ builder.append(" ORDER BY ");
+ builder.append(timeString);
+ builder.append(",");
+ builder.append(incrString);
+ builder.append(" ASC");
+
+ } else if (incrementingColumn != null) {
+ String incrString = quoteString + incrementingColumn + quoteString;
+ builder.append(" WHERE ");
+ builder.append(incrString);
+ builder.append(" > ?");
+ builder.append(" ORDER BY ");
+ builder.append(incrString);
+ builder.append(" ASC");
+ } else if (timestampColumn != null) {
+ String timeString = quoteString + timestampColumn + quoteString;
+ builder.append(" WHERE ");
+ builder.append(timeString);
+ builder.append(" > ? AND ");
+ builder.append(timeString);
+ builder.append(" < CURRENT_TIMESTAMP ORDER BY ");
+ builder.append(timeString);
+ builder.append(" ASC");
+ }
+ String queryString = builder.toString();
+ stmt = conn.prepareStatement(queryString);
+ log.info(queryString);
+ }
+
+ public static void main(String[] args) throws Exception {
+ TimestampIncrementingQuerier querier = new TimestampIncrementingQuerier();
+ try {
+ querier.start();
+ querier.poll();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected ResultSet executeQuery() throws SQLException {
+ if (incrementingColumn != null && timestampColumn != null) {
+ Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset);
+ stmt.setTimestamp(1, ts);
+ stmt.setLong(2, (incrementingOffset == null ? -1 : incrementingOffset));
+ stmt.setTimestamp(3, ts);
+ } else if (incrementingColumn != null) {
+ stmt.setLong(1, (incrementingOffset == null ? -1 : incrementingOffset));
+ } else if (timestampColumn != null) {
+ Timestamp ts = new Timestamp(timestampOffset == null ? 0 : timestampOffset);
+ stmt.setTimestamp(1, ts);
+ }
+ log.info("{}·", stmt);
+ log.info("{},{}", incrementingOffset, timestampOffset);
+ return stmt.executeQuery();
+ }
+
+ public List<Table> getList() {
+ return list;
+ }
+
+ public void setList(List<Table> list) {
+ this.list = list;
+ }
+
+ public void poll() {
+ try {
+ list.clear();
+ Connection conn = dataSource.getConnection();
+ for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+ String db = entry.getKey();
+ if (!db.contains("time_db"))
+ continue;
+ log.info("{} database is loading", db);
+ Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Table> tableEntry = iterator.next();
+ String tb = tableEntry.getKey();
+ log.info("{} table is loading", tb);
+ name = db + "." + tb;
+ storeRecord(name);
+ createPreparedStatement(conn);
+ ResultSet rs;
+ rs = executeQuery();
+ List<String> colList = tableEntry.getValue().getColList();
+ List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
+ List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
+
+ while (rs.next()) {
+ Table table = new Table(db, tb);
+ System.out.print("|");
+ table.setColList(colList);
+ table.setRawDataTypeList(DataTypeList);
+ table.setParserList(ParserList);
+ for (String string : colList) {
+ table.getDataList().add(rs.getObject(string));
+ System.out.print(string + " : " + rs.getObject(string) + "|");
+ }
+ incrementingOffset = incrementingOffset > rs.getInt(incrementingColumn) ? incrementingOffset
+ : rs.getInt(incrementingColumn);
+ timestampOffset = timestampOffset > rs.getTimestamp(timestampColumn).getTime() ? timestampOffset
+ : rs.getTimestamp(timestampColumn).getTime();
+ System.out.println(timestampOffset);
+ list.add(table);
+ System.out.println();
+ }
+ extractRecord(name);
+ incrementingOffset = 0L;
+ timestampOffset = 0L;
+ }
+ }
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void start() throws Exception {
+ try {
+ initDataSource();
+ if (incrementingColumn != null && timestampColumn != null) {
+ incrementingMap = new HashMap<>();
+ timestampMap = new HashMap<>();
+ } else if (incrementingColumn != null) {
+ incrementingMap = new HashMap<>();
+ } else if (timestampColumn != null) {
+ timestampMap = new HashMap<>();
+ }
+ } catch (Throwable exception) {
+ log.info("error,{}", exception);
+ }
+ schema = new Schema(dataSource);
+ schema.load();
+ }
+
+ private void initDataSource() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ config = super.getConfig();
+ timestampColumn = config.getTimestampColmnName();
+ incrementingColumn = config.getIncrementingColumnName();
+ map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+ map.put("url",
+ "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+ map.put("username", config.jdbcUsername);
+ map.put("password", config.jdbcPassword);
+ map.put("initialSize", "2");
+ map.put("maxActive", "2");
+ map.put("maxWait", "60000");
+ map.put("timeBetweenEvictionRunsMillis", "60000");
+ map.put("minEvictableIdleTimeMillis", "300000");
+ map.put("validationQuery", "SELECT 1 FROM DUAL");
+ map.put("testWhileIdle", "true");
+ log.info("{}config read successfully", map);
+ try {
+ dataSource = DruidDataSourceFactory.createDataSource(map);
+ } catch (Exception exception) {
+ log.info("exeception,{}", exception);
+ } catch (Error error) {
+ log.info("error,{}", error);
+ }
+ log.info("datasorce success");
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
index f9c8c6f..429494b 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
@@ -16,29 +16,88 @@
*/
package org.apache.rocketmq.connect.jdbc.connector;
+
import java.util.Collection;
-import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.sql.DataSource;
+import org.junit.Test;
+import java.sql.*;
+import com.alibaba.druid.pool.DruidDataSourceFactory;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.internal.DefaultKeyValue;
public class JdbcSourceTaskTest {
+ KeyValue kv;
+ DataSource dataSource;
+
+ @Test
+ public void testBulk() throws InterruptedException {
+ KeyValue kv = new DefaultKeyValue();
+ kv.put("jdbcUrl", "localhost:3306");
+ kv.put("jdbcUsername", "root");
+ kv.put("jdbcPassword", "199812160");
+ kv.put("mode", "bulk");
+ kv.put("rocketmqTopic", "JdbcTopic");
+ JdbcSourceTask task = new JdbcSourceTask();
+ task.start(kv);
+ Collection<SourceDataEntry> sourceDataEntry = task.poll();
+ System.out.println(sourceDataEntry);
+ }
+
+ @Test
+ public void testTimestampIncrementing() throws InterruptedException, SQLException {
+ kv = new DefaultKeyValue();
+ kv.put("jdbcUrl", "localhost:3306");
+ kv.put("jdbcUsername", "root");
+ kv.put("jdbcPassword", "199812160");
+ kv.put("incrementingColumnName", "id");
+ kv.put("timestampColmnName", "timestamp");
+ kv.put("mode", "incrementing+timestamp");
+ kv.put("rocketmqTopic", "JdbcTopic");
+ JdbcSourceTask task = new JdbcSourceTask();
+ task.start(kv);
+ Collection<SourceDataEntry> sourceDataEntry = task.poll();
+ System.out.println(sourceDataEntry);
+ Map<String, String> map = new HashMap<>();
+ map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+ map.put("url", "jdbc:mysql://" + kv.getString("jdbcUrl")
+ + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+ map.put("username", kv.getString("jdbcUsername"));
+ map.put("password", kv.getString("jdbcPassword"));
+ map.put("initialSize", "2");
+ map.put("maxActive", "2");
+ map.put("maxWait", "60000");
+ map.put("timeBetweenEvictionRunsMillis", "60000");
+ map.put("minEvictableIdleTimeMillis", "300000");
+ map.put("validationQuery", "SELECT 1 FROM DUAL");
+ map.put("testWhileIdle", "true");
+ try {
+ dataSource = DruidDataSourceFactory.createDataSource(map);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Connection connection= dataSource.getConnection();
+ PreparedStatement statement;
+ String s="insert into time_db.timestamp_tb (name) values(\"test\")";
+ statement=connection.prepareStatement(s);
+ statement.executeUpdate();
- @Test
- public void test() throws InterruptedException {
- KeyValue kv = new DefaultKeyValue();
- kv.put("jdbcUrl","localhost:3306");
- kv.put("jdbcUsername","root");
- kv.put("jdbcPassword","199812160");
- kv.put("mode","bulk");
- kv.put("rocketmqTopic","JdbcTopic");
- JdbcSourceTask task = new JdbcSourceTask();
- task.start(kv);
- Collection<SourceDataEntry> sourceDataEntry = task.poll();
- System.out.println(sourceDataEntry);
-
- }
+ sourceDataEntry = task.poll();
+ System.out.println(sourceDataEntry);
+ s="update time_db.timestamp_tb set name=\"liu\" where id < 2";
+ statement=connection.prepareStatement(s);
+ statement.executeUpdate();
+ sourceDataEntry = task.poll();
+ System.out.println(sourceDataEntry);
+ task.stop();
+
+ connection.close();
+ }
}