You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:34 UTC
[rocketmq-connect] 26/43: [ISSUE #485] Support repeat consumption (#486)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 42cbfb306735e8246f16b5e9f4be06b65cfad366
Author: chenyi19851209 <40...@qq.com>
AuthorDate: Thu Dec 19 09:22:16 2019 +0800
[ISSUE #485] Support repeat consumption (#486)
---
.../apache/rocketmq/connect/jdbc/sink/Updater.java | 74 ++++++++++++++++++++--
1 file changed, 67 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
index 3571852..e30c65f 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -32,18 +32,35 @@ public class Updater {
public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) {
Boolean isSuccess = false;
- int id = 0;
+ int beforeUpdateId = 0;
+ int afterUpdateId = 0;
switch (entryType) {
case CREATE:
- isSuccess = updateRow(dbName, tableName, fieldMap, id);
+ afterUpdateId = queryAfterUpdateRowId(dbName, tableName, fieldMap);
+ if (afterUpdateId != 0){
+ isSuccess = true;
+ break;
+ }
+ isSuccess = updateRow(dbName, tableName, fieldMap, beforeUpdateId);
break;
case UPDATE:
- id = queryRowId(dbName, tableName, fieldMap);
- isSuccess = updateRow(dbName, tableName, fieldMap, id);
+ afterUpdateId = queryAfterUpdateRowId(dbName, tableName, fieldMap);
+ if (afterUpdateId != 0){
+ isSuccess = true;
+ // 再查原有数据是否存在,存在则删除
+ beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
+ if (beforeUpdateId != 0){
+ isSuccess = deleteRow(dbName, tableName, beforeUpdateId);
+ }
+ break;
+ }
+
+ beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
+ isSuccess = updateRow(dbName, tableName, fieldMap, beforeUpdateId);
break;
case DELETE:
- id = queryRowId(dbName, tableName, fieldMap);
- isSuccess = deleteRow(dbName, tableName, id);
+ beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
+ isSuccess = deleteRow(dbName, tableName, beforeUpdateId);
break;
default:
log.error("entryType {} is illegal.", entryType.toString());
@@ -85,7 +102,7 @@ public class Updater {
return sql;
}
- private Integer queryRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+ private Integer queryBeforeUpdateRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
int count = 0, id = 0;
ResultSet rs;
PreparedStatement stmt;
@@ -128,6 +145,49 @@ public class Updater {
return id;
}
+ private Integer queryAfterUpdateRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+ int count = 0, id = 0;
+ ResultSet rs;
+ PreparedStatement stmt;
+ Boolean finishQuery = false;
+ String query = "select id from " + dbName + "." + tableName + " where ";
+
+ for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+ count ++;
+ String fieldName = entry.getKey().getName();
+ FieldType fieldType = entry.getKey().getType();
+ Object fieldValue = entry.getValue()[1];
+ if ("id".equals(fieldName))
+ continue;
+ if (count != 1) {
+ query += " and ";
+ }
+ if (fieldValue == null)
+ {
+ query += fieldName + " is NULL";
+ } else {
+ query = typeParser(fieldType, fieldName, fieldValue, query);
+ }
+ }
+
+ try {
+ while (!connection.isClosed() && !finishQuery){
+ stmt = connection.prepareStatement(query);
+ rs = stmt.executeQuery();
+ if (rs != null) {
+ while (rs.next()) {
+ id = rs.getInt("id");
+ }
+ finishQuery = true;
+ rs.close();
+ }
+ }
+ } catch (SQLException e) {
+ log.error("query table error,{}", e);
+ }
+ return id;
+ }
+
private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap, Integer id) {
int count = 0;
PreparedStatement stmt;