You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "sandynz (via GitHub)" <gi...@apache.org> on 2023/03/15 12:51:04 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24440: Improve CDC protocol

sandynz commented on code in PR #24440:
URL: https://github.com/apache/shardingsphere/pull/24440#discussion_r1136928007


##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java:
##########
@@ -98,19 +128,63 @@ public void write(final Record record) throws Exception {
         }
     }
     
-    private Optional<String> buildSQL(final Record record) {
+    private String buildKey(final String schema, final String tableName) {
+        return schema.isEmpty() ? tableName : String.join(".", schema, tableName);
+    }
+    
+    private StandardTableMetaData loadTableMetaData(final String schema, final String tableName) {
+        StandardTableMetaData result = tableMetaDataMap.get(buildKey(schema, tableName));
+        if (null != result) {
+            return result;
+        }
+        result = loader.getTableMetaData(Strings.emptyToNull(schema), tableName);
+        tableMetaDataMap.put(buildKey(schema, tableName), result);
+        return result;
+    }
+    
+    private Optional<String> buildSQL(final Record record, final List<String> uniqueKeyNames) {
         switch (record.getDataChangeType()) {
             case INSERT:
-                return Optional.ofNullable(sqlBuilder.buildInsertSQL(record));
+                return Optional.ofNullable(sqlBuilder.buildInsertSQL(record, uniqueKeyNames));
             case UPDATE:
-                return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record));
+                return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record, uniqueKeyNames));
             case DELETE:
-                return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record));
+                return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record, uniqueKeyNames));
             default:
                 return Optional.empty();
         }
     }
     
+    private Object convertValueFromAny(final StandardTableMetaData tableMetaData, final TableColumn tableColumn) {
+        StandardColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(tableColumn.getName());
+        Object object;
+        try {
+            object = AnyValueConvert.convertToObject(tableColumn.getValue());

Review Comment:
   `AnyValueConvert` could be `ProtobufAnyValueConverter`



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java:
##########
@@ -98,19 +128,63 @@ public void write(final Record record) throws Exception {
         }
     }
     
-    private Optional<String> buildSQL(final Record record) {
+    private String buildKey(final String schema, final String tableName) {
+        return schema.isEmpty() ? tableName : String.join(".", schema, tableName);
+    }
+    
+    private StandardTableMetaData loadTableMetaData(final String schema, final String tableName) {
+        StandardTableMetaData result = tableMetaDataMap.get(buildKey(schema, tableName));
+        if (null != result) {
+            return result;
+        }
+        result = loader.getTableMetaData(Strings.emptyToNull(schema), tableName);
+        tableMetaDataMap.put(buildKey(schema, tableName), result);
+        return result;
+    }
+    
+    private Optional<String> buildSQL(final Record record, final List<String> uniqueKeyNames) {
         switch (record.getDataChangeType()) {
             case INSERT:
-                return Optional.ofNullable(sqlBuilder.buildInsertSQL(record));
+                return Optional.ofNullable(sqlBuilder.buildInsertSQL(record, uniqueKeyNames));
             case UPDATE:
-                return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record));
+                return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record, uniqueKeyNames));
             case DELETE:
-                return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record));
+                return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record, uniqueKeyNames));
             default:
                 return Optional.empty();
         }
     }
     
+    private Object convertValueFromAny(final StandardTableMetaData tableMetaData, final TableColumn tableColumn) {
+        StandardColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(tableColumn.getName());
+        Object object;

Review Comment:
   `object` could be `result`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org