You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:34 UTC

[rocketmq-connect] 26/43: [ISSUE #485] Support repeat consumption (#486)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 42cbfb306735e8246f16b5e9f4be06b65cfad366
Author: chenyi19851209 <40...@qq.com>
AuthorDate: Thu Dec 19 09:22:16 2019 +0800

    [ISSUE #485] Support repeat consumption (#486)
---
 .../apache/rocketmq/connect/jdbc/sink/Updater.java | 74 ++++++++++++++++++++--
 1 file changed, 67 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
index 3571852..e30c65f 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -32,18 +32,35 @@ public class Updater {
 
     public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) {
         Boolean isSuccess = false;
-        int id = 0;
+        int beforeUpdateId = 0;
+        int afterUpdateId = 0;
         switch (entryType) {
             case CREATE:
-                isSuccess = updateRow(dbName, tableName, fieldMap, id);
+                afterUpdateId = queryAfterUpdateRowId(dbName, tableName, fieldMap);
+                if (afterUpdateId != 0){
+                    isSuccess = true;
+                    break;
+                }
+                isSuccess = updateRow(dbName, tableName, fieldMap, beforeUpdateId);
                 break;
             case UPDATE:
-                id = queryRowId(dbName, tableName, fieldMap);
-                isSuccess = updateRow(dbName, tableName, fieldMap, id);
+                afterUpdateId = queryAfterUpdateRowId(dbName, tableName, fieldMap);
+                if (afterUpdateId != 0){
+                    isSuccess = true;
+                    // 再查原有数据是否存在,存在则删除
+                    beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
+                    if (beforeUpdateId != 0){
+                       isSuccess = deleteRow(dbName, tableName, beforeUpdateId);
+                    }
+                    break;
+                }
+
+                beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
+                isSuccess = updateRow(dbName, tableName, fieldMap, beforeUpdateId);
                 break;
             case DELETE:
-                id = queryRowId(dbName, tableName, fieldMap);
-                isSuccess = deleteRow(dbName, tableName, id);
+                beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap);
+                isSuccess = deleteRow(dbName, tableName, beforeUpdateId);
                 break;
             default:
                 log.error("entryType {} is illegal.", entryType.toString());
@@ -85,7 +102,7 @@ public class Updater {
         return sql;
     }
 
-    private Integer queryRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+    private Integer queryBeforeUpdateRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
         int count = 0, id = 0;
         ResultSet rs;
         PreparedStatement stmt;
@@ -128,6 +145,49 @@ public class Updater {
         return id;
     }
 
+    private Integer queryAfterUpdateRowId(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+        int count = 0, id = 0;
+        ResultSet rs;
+        PreparedStatement stmt;
+        Boolean finishQuery = false;
+        String query = "select id from " + dbName + "." + tableName + " where ";
+
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count ++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[1];
+            if ("id".equals(fieldName))
+                continue;
+            if (count != 1) {
+                query += " and ";
+            }
+            if (fieldValue == null)
+            {
+                query += fieldName + " is NULL";
+            } else {
+                query = typeParser(fieldType, fieldName, fieldValue, query);
+            }
+        }
+
+        try {
+            while (!connection.isClosed() && !finishQuery){
+                stmt = connection.prepareStatement(query);
+                rs = stmt.executeQuery();
+                if (rs != null) {
+                    while (rs.next()) {
+                        id = rs.getInt("id");
+                    }
+                    finishQuery = true;
+                    rs.close();
+                }
+            }
+        } catch (SQLException e) {
+            log.error("query table error,{}", e);
+        }
+        return id;
+    }
+
     private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap, Integer id) {
         int count = 0;
         PreparedStatement stmt;