You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/17 23:46:46 UTC
[shardingsphere] branch master updated: Scaling prepare tables part
3 : Implement openGauss dialect (#12523)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6ce6163 Scaling prepare tables part 3 : Implement openGauss dialect (#12523)
6ce6163 is described below
commit 6ce616322361d187ffddd1685618bd17fc35f694
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Sep 18 07:46:11 2021 +0800
Scaling prepare tables part 3 : Implement openGauss dialect (#12523)
---
.../shardingsphere-scaling-bootstrap/pom.xml | 1 -
.../job/preparer/AbstractDataSourcePreparer.java | 76 ++++++++++-
.../core/job/preparer/ActualTableDefinition.java} | 36 +++---
.../core/job/preparer/TableDefinitionSQLType.java} | 28 ++---
.../scaling/core/util/JobConfigurationUtil.java | 25 +++-
.../preparer/AbstractDataSourcePreparerTest.java | 46 +++++--
.../component/checker/MySQLDataSourcePreparer.java | 7 +-
.../scaling/opengauss/OpenGaussScalingEntry.java | 6 +-
.../checker/OpenGaussDataSourcePreparer.java | 140 +++++++++++++++++++++
.../checker/OpenGaussEnvironmentChecker.java} | 15 ++-
.../opengauss/OpenGaussScalingEntryTest.java | 4 +-
.../checker/PostgreSQLEnvironmentChecker.java | 3 +
12 files changed, 314 insertions(+), 73 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
index 23819e6..4ce7829 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
@@ -44,7 +44,6 @@
<artifactId>shardingsphere-scaling-postgresql</artifactId>
<version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-scaling-opengauss</artifactId>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index 379fc3b..661733b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -18,21 +18,33 @@
package org.apache.shardingsphere.scaling.core.job.preparer;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
+import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceWrapper;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.yaml.ShardingRuleConfigurationSwapper;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -46,6 +58,8 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+ private static final Pattern PATTERN_ALTER_TABLE = Pattern.compile("ALTER\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+
private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
protected DataSourceWrapper getSourceDataSource(final JobConfiguration jobConfig) {
@@ -56,10 +70,14 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
return dataSourceFactory.newInstance(jobConfig.getRuleConfig().getTarget().unwrap());
}
- protected List<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
- List<String> result = new ArrayList<>();
+ protected Collection<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
+ return getLogicTableNames(ruleConfig);
+ }
+
+ private Collection<String> getLogicTableNames(final ShardingRuleConfiguration ruleConfig) {
+ Collection<String> result = new ArrayList<>();
List<String> tableNames = ruleConfig.getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
List<String> autoTableNames = ruleConfig.getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
result.addAll(tableNames);
@@ -67,18 +85,64 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
return result;
}
- protected void createTargetTable(final Connection targetConnection, final String createTableSQL) throws SQLException {
- String sql = addIfNotExistsForCreateTableSQL(createTableSQL);
- log.info("create target table, sql: {}", sql);
+ /**
+ * Get data source table names map.
+ *
+ * @param sourceConfig source data source configuration
+ * @return data source table names map. map(data source, map(first actual table name of logic table, logic table name)).
+ */
+ protected Map<DataSource, Map<String, String>> getDataSourceTableNamesMap(final ScalingDataSourceConfiguration sourceConfig) {
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
+ ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
+ Map<String, DataSourceConfiguration> dataSourceConfigs = JobConfigurationUtil.getDataSourceConfigurations(source.getRootConfig());
+ Map<String, DataSource> dataSourceMap = dataSourceConfigs.entrySet().stream().collect(
+ Collectors.toMap(Entry::getKey, entry -> new DataSourceWrapper(null), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ ShardingRule shardingRule = new ShardingRule(ruleConfig, dataSourceMap);
+ Collection<String> logicTableNames = getLogicTableNames(ruleConfig);
+ Map<String, Map<String, String>> dataSourceNameTableNamesMap = new HashMap<>();
+ for (String each : logicTableNames) {
+ DataNode dataNode = shardingRule.getDataNode(each);
+ dataSourceNameTableNamesMap.computeIfAbsent(dataNode.getDataSourceName(), key -> new LinkedHashMap<>()).put(dataNode.getTableName(), each);
+ }
+ return dataSourceNameTableNamesMap.entrySet().stream().collect(
+ Collectors.toMap(entry -> DataSourceConverter.getDataSource(dataSourceConfigs.get(entry.getKey())), Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ }
+
+ protected void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
+ log.info("execute target table sql: {}", sql);
try (Statement statement = targetConnection.createStatement()) {
statement.execute(sql);
}
}
- private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
+ protected Collection<String> splitTableDefinitionToSQLs(final ActualTableDefinition actualTableDefinition) {
+ return Arrays.stream(actualTableDefinition.getTableDefinition().split(";")).collect(Collectors.toList());
+ }
+
+ //TODO simple lexer
+ protected TableDefinitionSQLType getTableDefinitionSQLType(final String sql) {
+ if (PATTERN_CREATE_TABLE.matcher(sql).find()) {
+ return TableDefinitionSQLType.CREATE_TABLE;
+ }
+ if (PATTERN_ALTER_TABLE.matcher(sql).find()) {
+ return TableDefinitionSQLType.ALTER_TABLE;
+ }
+ return TableDefinitionSQLType.UNKNOWN;
+ }
+
+ protected String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find()) {
return createTableSQL;
}
return PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
}
+
+ protected String replaceActualTableNameToLogicTableName(final String createOrAlterTableSQL, final String actualTableName, final String logicTableName) {
+ int start = createOrAlterTableSQL.indexOf(actualTableName);
+ if (start <= 0) {
+ return createOrAlterTableSQL;
+ }
+ int end = start + actualTableName.length();
+ return new StringBuilder(createOrAlterTableSQL).replace(start, end, logicTableName).toString();
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ActualTableDefinition.java
similarity index 52%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ActualTableDefinition.java
index 86a2f03..fa6227f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ActualTableDefinition.java
@@ -15,26 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.postgresql.component.checker;
+package org.apache.shardingsphere.scaling.core.job.preparer;
-import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
-public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
+/**
+ * Actual table definition.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class ActualTableDefinition {
- @Override
- public Class<PostgreSQLDataSourceChecker> getDataSourceCheckerClass() {
- return PostgreSQLDataSourceChecker.class;
- }
+ private final String logicTableName;
- @Override
- public Class<PostgreSQLDataConsistencyChecker> getDataConsistencyCheckerClass() {
- return PostgreSQLDataConsistencyChecker.class;
- }
+ private final String actualTableName;
- @Override
- public Class<? extends DataSourcePreparer> getDataSourcePreparerClass() {
- //TODO
- return null;
- }
+ /**
+ * Plenty of actual table definition SQLs, separated with ';'.
+ * <p/>
+ * May be <code>CREATE TABLE</code>, <code>ALTER TABLE</code>, <code>TABLESPACE</code>, <code>SET search_path</code>, etc.
+ */
+ private final String tableDefinition;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/TableDefinitionSQLType.java
similarity index 51%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/TableDefinitionSQLType.java
index 86a2f03..09b4031 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/TableDefinitionSQLType.java
@@ -15,26 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.postgresql.component.checker;
+package org.apache.shardingsphere.scaling.core.job.preparer;
-import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
-
-public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
-
- @Override
- public Class<PostgreSQLDataSourceChecker> getDataSourceCheckerClass() {
- return PostgreSQLDataSourceChecker.class;
- }
-
- @Override
- public Class<PostgreSQLDataConsistencyChecker> getDataConsistencyCheckerClass() {
- return PostgreSQLDataConsistencyChecker.class;
- }
+/**
+ * Table definition SQL type.
+ */
+public enum TableDefinitionSQLType {
- @Override
- public Class<? extends DataSourcePreparer> getDataSourcePreparerClass() {
- //TODO
- return null;
- }
+ UNKNOWN,
+ CREATE_TABLE,
+ ALTER_TABLE,
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
index 2a03616..7454da0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
@@ -222,9 +222,10 @@ public final class JobConfigurationUtil {
List<TaskConfiguration> result = new LinkedList<>();
ShardingSphereJDBCDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
- Map<String, DataSourceConfiguration> sourceDataSource = getDataSourceConfigurations(sourceConfig.getRootConfig().getDataSources());
+ Map<String, DataSourceConfiguration> sourceDataSource = getDataSourceConfigurations(sourceConfig.getRootConfig());
Map<String, DataSource> dataSourceMap = DataSourceConverter.getDataSourceMap(sourceDataSource);
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(new ShardingRule(sourceRuleConfig, dataSourceMap));
+ closeDataSources(dataSourceMap.values());
Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfiguration(jobConfig);
filterByShardingDataSourceTables(dataSourceTableNameMap, jobConfig.getHandleConfig());
Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
@@ -242,12 +243,32 @@ public final class JobConfigurationUtil {
return (ShardingSphereJDBCDataSourceConfiguration) result;
}
- private static Map<String, DataSourceConfiguration> getDataSourceConfigurations(final Map<String, Map<String, Object>> yamlDataSourceConfigs) {
+ /**
+ * Get data source configurations.
+ *
+ * @param yamlRootConfiguration yaml root configuration
+ * @return data source name to data source configuration map
+ */
+ public static Map<String, DataSourceConfiguration> getDataSourceConfigurations(final YamlRootConfiguration yamlRootConfiguration) {
+ Map<String, Map<String, Object>> yamlDataSourceConfigs = yamlRootConfiguration.getDataSources();
Map<String, DataSourceConfiguration> result = new LinkedHashMap<>(yamlDataSourceConfigs.size());
yamlDataSourceConfigs.forEach((key, value) -> result.put(key, new YamlDataSourceConfigurationSwapper().swapToDataSourceConfiguration(value)));
return result;
}
+ private static void closeDataSources(final Collection<DataSource> dataSources) {
+ for (DataSource dataSource : dataSources) {
+ if (dataSource instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) dataSource).close();
+ //CHECKSTYLE:OFF
+ } catch (final Exception ignored) {
+ //CHECKSTYLE:ON
+ }
+ }
+ }
+ }
+
private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final JobConfiguration jobConfig) {
ScalingDataSourceConfiguration dataSourceConfig = jobConfig.getRuleConfig().getTarget().unwrap();
if (dataSourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparerTest.java
similarity index 50%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparerTest.java
index b180a4a..fc328c2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparerTest.java
@@ -15,39 +15,59 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.preparer;
+package org.apache.shardingsphere.scaling.core.job.preparer;
-import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.job.preparer.AbstractDataSourcePreparer;
import org.junit.Test;
-import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class AbstractDataSourcePreparerTest {
private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
+ private final AbstractDataSourcePreparer preparer = new AbstractDataSourcePreparer() {
+ @Override
+ public void prepareTargetTables(final JobConfiguration jobConfig) {
+ }
+ };
+
+ @Test
+ public void assertGetTableDefinitionSQLType() {
+ Collection<Pair<String, TableDefinitionSQLType>> pairs = new ArrayList<>();
+ pairs.add(Pair.of("SET search_path = public", TableDefinitionSQLType.UNKNOWN));
+ pairs.add(Pair.of("CREATE TABLE t1_0 (id int NOT NULL)", TableDefinitionSQLType.CREATE_TABLE));
+ pairs.add(Pair.of("ALTER TABLE t1_0 ADD CONSTRAINT t1_0_pkey PRIMARY KEY (id)", TableDefinitionSQLType.ALTER_TABLE));
+ for (Pair<String, TableDefinitionSQLType> each : pairs) {
+ TableDefinitionSQLType sqlType = preparer.getTableDefinitionSQLType(each.getKey());
+ assertThat(sqlType, is(each.getValue()));
+ }
+ }
+
@Test
- @SneakyThrows(ReflectiveOperationException.class)
public void assertAddIfNotExistsForCreateTableSQL() {
- Method method = AbstractDataSourcePreparer.class.getDeclaredMethod("addIfNotExistsForCreateTableSQL", String.class);
- method.setAccessible(true);
- AbstractDataSourcePreparer preparer = new AbstractDataSourcePreparer() {
- @Override
- public void prepareTargetTables(final JobConfiguration jobConfig) {
- }
- };
List<String> createTableSQLs = Arrays.asList("CREATE TABLE IF NOT EXISTS t (id int)", "CREATE TABLE t (id int)",
"CREATE TABLE IF \nNOT \tEXISTS t (id int)", "CREATE \tTABLE t (id int)");
for (String createTableSQL : createTableSQLs) {
- String sql = (String) method.invoke(preparer, createTableSQL);
+ String sql = preparer.addIfNotExistsForCreateTableSQL(createTableSQL);
assertTrue(PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(sql).find());
}
}
+
+ @Test
+ public void assertReplaceActualTableNameToLogicTableName() {
+ String sql = "ALTER TABLE t_order_0 ADD CONSTRAINT t_order_0_uniq UNIQUE (order_id)";
+ String expected = "ALTER TABLE t_order ADD CONSTRAINT t_order_0_uniq UNIQUE (order_id)";
+ String actual = preparer.replaceActualTableNameToLogicTableName(sql, "t_order_0", "t_order");
+ assertThat(actual, is(expected));
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
index af7f2fe..7a7e1d7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
@@ -28,8 +28,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
import java.util.Collections;
-import java.util.List;
/**
* Data source preparer for MySQL.
@@ -45,10 +45,11 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
Connection sourceConnection = sourceDataSource.getConnection();
DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig);
Connection targetConnection = targetDataSource.getConnection()) {
- List<String> logicTableNames = getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap());
+ Collection<String> logicTableNames = getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap());
for (String each : logicTableNames) {
String createTableSQL = getCreateTableSQL(sourceConnection, each);
- createTargetTable(targetConnection, createTableSQL);
+ createTableSQL = addIfNotExistsForCreateTableSQL(createTableSQL);
+ executeTargetTableSQL(targetConnection, createTableSQL);
log.info("create target table '{}' success", each);
}
} catch (final SQLException ex) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
index 07f54b2..3a6f01d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
@@ -22,8 +22,8 @@ import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussImporter;
import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussPositionInitializer;
import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussScalingSQLBuilder;
import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussWalDumper;
+import org.apache.shardingsphere.scaling.opengauss.component.checker.OpenGaussEnvironmentChecker;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLInventoryDumper;
-import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLEnvironmentChecker;
/**
* OpenGauss scaling entry.
@@ -51,8 +51,8 @@ public final class OpenGaussScalingEntry implements ScalingEntry {
}
@Override
- public Class<PostgreSQLEnvironmentChecker> getEnvironmentCheckerClass() {
- return PostgreSQLEnvironmentChecker.class;
+ public Class<OpenGaussEnvironmentChecker> getEnvironmentCheckerClass() {
+ return OpenGaussEnvironmentChecker.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
new file mode 100644
index 0000000..f6f791c
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component.checker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceWrapper;
+import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.job.preparer.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.scaling.core.job.preparer.ActualTableDefinition;
+import org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Data source preparer for openGauss.
+ */
+@Slf4j
+public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
+
+ @Override
+ public void prepareTargetTables(final JobConfiguration jobConfig) {
+ Collection<ActualTableDefinition> actualTableDefinitions;
+ try {
+ actualTableDefinitions = getActualTableDefinitions(jobConfig);
+ } catch (final SQLException ex) {
+ throw new PrepareFailedException("get table definitions failed.", ex);
+ }
+ Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
+ try (DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig);
+ Connection targetConnection = targetDataSource.getConnection()) {
+ for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
+ for (String each : entry.getValue()) {
+ executeTargetTableSQL(targetConnection, each);
+ }
+ log.info("create target table '{}' success", entry.getKey());
+ }
+ } catch (final SQLException ex) {
+ throw new PrepareFailedException("prepare target tables failed.", ex);
+ }
+ }
+
+ private Collection<ActualTableDefinition> getActualTableDefinitions(final JobConfiguration jobConfig) throws SQLException {
+ Collection<ActualTableDefinition> result = new ArrayList<>();
+ Map<DataSource, Map<String, String>> dataSourceTableNamesMap = getDataSourceTableNamesMap(jobConfig.getRuleConfig().getSource().unwrap());
+ for (Entry<DataSource, Map<String, String>> entry : dataSourceTableNamesMap.entrySet()) {
+ try (DataSourceWrapper dataSource = new DataSourceWrapper(entry.getKey());
+ Connection sourceConnection = dataSource.getConnection()) {
+ for (Entry<String, String> tableNameEntry : entry.getValue().entrySet()) {
+ String actualTableName = tableNameEntry.getValue();
+ int oid = queryTableOid(sourceConnection, actualTableName);
+ String tableDefinition = queryTableDefinition(sourceConnection, oid);
+ String logicTableName = tableNameEntry.getKey();
+ result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition));
+ }
+ }
+ }
+ return result;
+ }
+
+ private int queryTableOid(final Connection sourceConnection, final String actualTableName) throws SQLException {
+ String sql = "SELECT oid FROM pg_class WHERE relname = ?";
+ try (PreparedStatement statement = sourceConnection.prepareStatement(sql)) {
+ statement.setString(1, actualTableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (!resultSet.next()) {
+ throw new PrepareFailedException("select oid has no result, sql: " + sql + ", actualTableName: " + actualTableName);
+ }
+ return resultSet.getInt(1);
+ }
+ }
+ }
+
+ private String queryTableDefinition(final Connection sourceConnection, final int oid) throws SQLException {
+ String sql = String.format("SELECT * FROM pg_get_tabledef(%d)", oid);
+ try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) {
+ if (!resultSet.next()) {
+ throw new PrepareFailedException("table definition has no result, sql: " + sql);
+ }
+ return resultSet.getString(1);
+ }
+ }
+
+ /**
+ * Get create logic table SQLs.
+ *
+ * @param actualTableDefinitions actual table definitions. key is logic table name, value is actual table definition.
+ * @return all SQLs. key is logic table name, value is collection of logic table SQLs.
+ */
+ private Map<String, Collection<String>> getCreateLogicTableSQLs(final Collection<ActualTableDefinition> actualTableDefinitions) {
+ Map<String, Collection<String>> result = new HashMap<>();
+ for (ActualTableDefinition each : actualTableDefinitions) {
+ Collection<String> logicTableSQLs = splitTableDefinitionToSQLs(each).stream().map(sql -> {
+ TableDefinitionSQLType sqlType = getTableDefinitionSQLType(sql);
+ switch (sqlType) {
+ //TODO replace constraint and index name
+ case CREATE_TABLE:
+ sql = addIfNotExistsForCreateTableSQL(sql);
+ sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
+ return sql;
+ case ALTER_TABLE:
+ sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
+ return sql;
+ case UNKNOWN:
+ return sql;
+ default:
+ return sql;
+ }
+ }).collect(Collectors.toList());
+ result.put(each.getLogicTableName(), logicTableSQLs);
+ }
+ return result;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussEnvironmentChecker.java
similarity index 69%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussEnvironmentChecker.java
index 86a2f03..a44a1b0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussEnvironmentChecker.java
@@ -15,12 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.postgresql.component.checker;
+package org.apache.shardingsphere.scaling.opengauss.component.checker;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataConsistencyChecker;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataSourceChecker;
-public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
+/**
+ * Environment checker for openGauss.
+ */
+public final class OpenGaussEnvironmentChecker implements EnvironmentChecker {
@Override
public Class<PostgreSQLDataSourceChecker> getDataSourceCheckerClass() {
@@ -33,8 +37,7 @@ public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
}
@Override
- public Class<? extends DataSourcePreparer> getDataSourcePreparerClass() {
- //TODO
- return null;
+ public Class<OpenGaussDataSourcePreparer> getDataSourcePreparerClass() {
+ return OpenGaussDataSourcePreparer.class;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
index cd1a377..777d29d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
@@ -22,10 +22,10 @@ import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussImporter;
import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussPositionInitializer;
import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussWalDumper;
+import org.apache.shardingsphere.scaling.opengauss.component.checker.OpenGaussEnvironmentChecker;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataConsistencyChecker;
import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataSourceChecker;
-import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLEnvironmentChecker;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -39,7 +39,7 @@ public final class OpenGaussScalingEntryTest {
ScalingEntry scalingEntry = ScalingEntryLoader.getInstance("openGauss");
assertTrue(scalingEntry instanceof OpenGaussScalingEntry);
assertThat(scalingEntry.getPositionInitializerClass(), equalTo(OpenGaussPositionInitializer.class));
- assertThat(scalingEntry.getEnvironmentCheckerClass(), equalTo(PostgreSQLEnvironmentChecker.class));
+ assertThat(scalingEntry.getEnvironmentCheckerClass(), equalTo(OpenGaussEnvironmentChecker.class));
assertThat(scalingEntry.getEnvironmentCheckerClass().newInstance().getDataSourceCheckerClass(), equalTo(PostgreSQLDataSourceChecker.class));
assertThat(scalingEntry.getEnvironmentCheckerClass().newInstance().getDataConsistencyCheckerClass(), equalTo(PostgreSQLDataConsistencyChecker.class));
assertThat(scalingEntry.getImporterClass(), equalTo(OpenGaussImporter.class));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
index 86a2f03..d1acc7d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
@@ -20,6 +20,9 @@ package org.apache.shardingsphere.scaling.postgresql.component.checker;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+/**
+ * Environment checker for PostgreSQL.
+ */
public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
@Override