You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/09/23 05:10:14 UTC

[shardingsphere] branch master updated: modify decoding plugin to mppdb_decoding and repair actual table name issue (#12621)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ac06d  modify decoding plugin to mppdb_decoding and repair actual table name issue (#12621)
44ac06d is described below

commit 44ac06d368395b80d08ecaaee7f709f4712bd1b7
Author: justbk2015 <24...@qq.com>
AuthorDate: Thu Sep 23 13:09:39 2021 +0800

    modify decoding plugin to mppdb_decoding and repair actual table name issue (#12621)
    
    Co-authored-by: justbk2015 <24...@qq.com>
---
 .../job/preparer/AbstractDataSourcePreparer.java   |  4 ++
 .../opengauss/component/OpenGaussWalDumper.java    |  3 ++
 .../checker/OpenGaussDataSourcePreparer.java       | 46 +++++++++++++++++++---
 .../opengauss/wal/OpenGaussLogicalReplication.java |  4 +-
 4 files changed, 49 insertions(+), 8 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index 661733b..c00e3f0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -112,6 +112,10 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         log.info("execute target table sql: {}", sql);
         try (Statement statement = targetConnection.createStatement()) {
             statement.execute(sql);
+        } catch (final SQLException ex) {
+            if (!ex.getMessage().contains("multiple primary keys for table")) {
+                throw ex;
+            }
         }
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
index 3c6cdae..89d77b8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
@@ -116,6 +116,9 @@ public final class OpenGaussWalDumper extends AbstractScalingExecutor implements
                 pushRecord(record);
             }
         } catch (final SQLException ex) {
+            if (ex.getMessage().contains("is already active")) {
+                return;
+            }
             throw new ScalingTaskExecuteException(ex);
         }
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
index f6f791c..97c8619 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
@@ -34,6 +34,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
@@ -44,6 +45,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
     
+    private static final String WITH_OF_TABLE_EXTEND = "with (";
+    
     @Override
     public void prepareTargetTables(final JobConfiguration jobConfig) {
         Collection<ActualTableDefinition> actualTableDefinitions;
@@ -73,10 +76,10 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
             try (DataSourceWrapper dataSource = new DataSourceWrapper(entry.getKey());
                  Connection sourceConnection = dataSource.getConnection()) {
                 for (Entry<String, String> tableNameEntry : entry.getValue().entrySet()) {
-                    String actualTableName = tableNameEntry.getValue();
+                    String actualTableName = tableNameEntry.getKey();
                     int oid = queryTableOid(sourceConnection, actualTableName);
                     String tableDefinition = queryTableDefinition(sourceConnection, oid);
-                    String logicTableName = tableNameEntry.getKey();
+                    String logicTableName = tableNameEntry.getValue();
                     result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition));
                 }
             }
@@ -123,18 +126,49 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
                     case CREATE_TABLE:
                         sql = addIfNotExistsForCreateTableSQL(sql);
                         sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
+                        sql = skipCreateTableExtendSet(sql);
                         return sql;
                     case ALTER_TABLE:
                         sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
                         return sql;
-                    case UNKNOWN:
-                        return sql;
                     default:
-                        return sql;
+                        return "";
                 }
-            }).collect(Collectors.toList());
+            }).filter(sql -> !"".equals(sql)).collect(Collectors.toList());
             result.put(each.getLogicTableName(), logicTableSQLs);
         }
         return result;
     }
+    
+    @Override
+    protected String replaceActualTableNameToLogicTableName(final String createOrAlterTableSQL, final String actualTableName, final String logicTableName) {
+        StringBuilder logicalTableSQL = new StringBuilder(createOrAlterTableSQL);
+        while (true) {
+            int start = logicalTableSQL.indexOf(actualTableName);
+            if (start <= 0) {
+                return logicalTableSQL.toString();
+            }
+            int end = start + actualTableName.length();
+            logicalTableSQL.replace(start, end, logicTableName);
+        }
+    }
+    
+    private String skipCreateTableExtendSet(final String createSQL) {
+        String lowerCreateSQL = createSQL.toLowerCase();
+        String[] search = {WITH_OF_TABLE_EXTEND, ")"};
+        List<Integer> searchPos = new ArrayList<>(2);
+        int startPos = 0;
+        for (String each: search) {
+            int curSearch = lowerCreateSQL.indexOf(each, startPos);
+            if (curSearch <= 0) {
+                break;
+            }
+            searchPos.add(curSearch);
+            startPos = curSearch;
+        }
+        if (searchPos.size() != search.length) {
+            return createSQL;
+        }
+        return createSQL.substring(0, searchPos.get(0)) + createSQL.substring(searchPos.get(1) + 1);
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
index 6fcaa31..74e4d55 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
@@ -40,7 +40,7 @@ public final class OpenGaussLogicalReplication {
 
     public static final String SLOT_NAME = "sharding_scaling";
 
-    public static final String DECODE_PLUGIN = "test_decoding";
+    public static final String DECODE_PLUGIN = "mppdb_decoding";
 
     public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
 
@@ -92,7 +92,7 @@ public final class OpenGaussLogicalReplication {
      */
     public static void createIfNotExists(final Connection conn) throws SQLException {
         if (isSlotNameExist(conn)) {
-            dropSlot(conn);
+            return;
         }
         createSlotBySql(conn);
     }