You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/09/23 05:10:14 UTC
[shardingsphere] branch master updated: modify decoding plugin to
mppdb_decoding and repair actual table name issue (#12621)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 44ac06d modify decoding plugin to mppdb_decoding and repair actual table name issue (#12621)
44ac06d is described below
commit 44ac06d368395b80d08ecaaee7f709f4712bd1b7
Author: justbk2015 <24...@qq.com>
AuthorDate: Thu Sep 23 13:09:39 2021 +0800
modify decoding plugin to mppdb_decoding and repair actual table name issue (#12621)
Co-authored-by: justbk2015 <24...@qq.com>
---
.../job/preparer/AbstractDataSourcePreparer.java | 4 ++
.../opengauss/component/OpenGaussWalDumper.java | 3 ++
.../checker/OpenGaussDataSourcePreparer.java | 46 +++++++++++++++++++---
.../opengauss/wal/OpenGaussLogicalReplication.java | 4 +-
4 files changed, 49 insertions(+), 8 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index 661733b..c00e3f0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -112,6 +112,10 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
log.info("execute target table sql: {}", sql);
try (Statement statement = targetConnection.createStatement()) {
statement.execute(sql);
+ } catch (final SQLException ex) {
+ if (!ex.getMessage().contains("multiple primary keys for table")) {
+ throw ex;
+ }
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
index 3c6cdae..89d77b8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
@@ -116,6 +116,9 @@ public final class OpenGaussWalDumper extends AbstractScalingExecutor implements
pushRecord(record);
}
} catch (final SQLException ex) {
+ if (ex.getMessage().contains("is already active")) {
+ return;
+ }
throw new ScalingTaskExecuteException(ex);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
index f6f791c..97c8619 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
@@ -34,6 +34,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
@@ -44,6 +45,8 @@ import java.util.stream.Collectors;
@Slf4j
public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
+ private static final String WITH_OF_TABLE_EXTEND = "with (";
+
@Override
public void prepareTargetTables(final JobConfiguration jobConfig) {
Collection<ActualTableDefinition> actualTableDefinitions;
@@ -73,10 +76,10 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
try (DataSourceWrapper dataSource = new DataSourceWrapper(entry.getKey());
Connection sourceConnection = dataSource.getConnection()) {
for (Entry<String, String> tableNameEntry : entry.getValue().entrySet()) {
- String actualTableName = tableNameEntry.getValue();
+ String actualTableName = tableNameEntry.getKey();
int oid = queryTableOid(sourceConnection, actualTableName);
String tableDefinition = queryTableDefinition(sourceConnection, oid);
- String logicTableName = tableNameEntry.getKey();
+ String logicTableName = tableNameEntry.getValue();
result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition));
}
}
@@ -123,18 +126,49 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
case CREATE_TABLE:
sql = addIfNotExistsForCreateTableSQL(sql);
sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
+ sql = skipCreateTableExtendSet(sql);
return sql;
case ALTER_TABLE:
sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
return sql;
- case UNKNOWN:
- return sql;
default:
- return sql;
+ return "";
}
- }).collect(Collectors.toList());
+ }).filter(sql -> !"".equals(sql)).collect(Collectors.toList());
result.put(each.getLogicTableName(), logicTableSQLs);
}
return result;
}
+
+ @Override
+ protected String replaceActualTableNameToLogicTableName(final String createOrAlterTableSQL, final String actualTableName, final String logicTableName) {
+ StringBuilder logicalTableSQL = new StringBuilder(createOrAlterTableSQL);
+ while (true) {
+ int start = logicalTableSQL.indexOf(actualTableName);
+ if (start <= 0) {
+ return logicalTableSQL.toString();
+ }
+ int end = start + actualTableName.length();
+ logicalTableSQL.replace(start, end, logicTableName);
+ }
+ }
+
+ private String skipCreateTableExtendSet(final String createSQL) {
+ String lowerCreateSQL = createSQL.toLowerCase();
+ String[] search = {WITH_OF_TABLE_EXTEND, ")"};
+ List<Integer> searchPos = new ArrayList<>(2);
+ int startPos = 0;
+ for (String each: search) {
+ int curSearch = lowerCreateSQL.indexOf(each, startPos);
+ if (curSearch <= 0) {
+ break;
+ }
+ searchPos.add(curSearch);
+ startPos = curSearch;
+ }
+ if (searchPos.size() != search.length) {
+ return createSQL;
+ }
+ return createSQL.substring(0, searchPos.get(0)) + createSQL.substring(searchPos.get(1) + 1);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
index 6fcaa31..74e4d55 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
@@ -40,7 +40,7 @@ public final class OpenGaussLogicalReplication {
public static final String SLOT_NAME = "sharding_scaling";
- public static final String DECODE_PLUGIN = "test_decoding";
+ public static final String DECODE_PLUGIN = "mppdb_decoding";
public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
@@ -92,7 +92,7 @@ public final class OpenGaussLogicalReplication {
*/
public static void createIfNotExists(final Connection conn) throws SQLException {
if (isSlotNameExist(conn)) {
- dropSlot(conn);
+ return;
}
createSlotBySql(conn);
}