You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/09/17 23:46:46 UTC

[shardingsphere] branch master updated: Scaling prepare tables part 3 : Implement openGauss dialect (#12523)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ce6163  Scaling prepare tables part 3 : Implement openGauss dialect (#12523)
6ce6163 is described below

commit 6ce616322361d187ffddd1685618bd17fc35f694
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Sep 18 07:46:11 2021 +0800

    Scaling prepare tables part 3 : Implement openGauss dialect (#12523)
---
 .../shardingsphere-scaling-bootstrap/pom.xml       |   1 -
 .../job/preparer/AbstractDataSourcePreparer.java   |  76 ++++++++++-
 .../core/job/preparer/ActualTableDefinition.java}  |  36 +++---
 .../core/job/preparer/TableDefinitionSQLType.java} |  28 ++---
 .../scaling/core/util/JobConfigurationUtil.java    |  25 +++-
 .../preparer/AbstractDataSourcePreparerTest.java   |  46 +++++--
 .../component/checker/MySQLDataSourcePreparer.java |   7 +-
 .../scaling/opengauss/OpenGaussScalingEntry.java   |   6 +-
 .../checker/OpenGaussDataSourcePreparer.java       | 140 +++++++++++++++++++++
 .../checker/OpenGaussEnvironmentChecker.java}      |  15 ++-
 .../opengauss/OpenGaussScalingEntryTest.java       |   4 +-
 .../checker/PostgreSQLEnvironmentChecker.java      |   3 +
 12 files changed, 314 insertions(+), 73 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
index 23819e6..4ce7829 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/pom.xml
@@ -44,7 +44,6 @@
             <artifactId>shardingsphere-scaling-postgresql</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-scaling-opengauss</artifactId>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index 379fc3b..661733b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -18,21 +18,33 @@
 package org.apache.shardingsphere.scaling.core.job.preparer;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
+import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceWrapper;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.yaml.ShardingRuleConfigurationSwapper;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
 import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -46,6 +58,8 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     
     private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
     
+    private static final Pattern PATTERN_ALTER_TABLE = Pattern.compile("ALTER\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+    
     private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
     
     protected DataSourceWrapper getSourceDataSource(final JobConfiguration jobConfig) {
@@ -56,10 +70,14 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         return dataSourceFactory.newInstance(jobConfig.getRuleConfig().getTarget().unwrap());
     }
     
-    protected List<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
-        List<String> result = new ArrayList<>();
+    protected Collection<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
         ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
         ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
+        return getLogicTableNames(ruleConfig);
+    }
+    
+    private Collection<String> getLogicTableNames(final ShardingRuleConfiguration ruleConfig) {
+        Collection<String> result = new ArrayList<>();
         List<String> tableNames = ruleConfig.getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
         List<String> autoTableNames = ruleConfig.getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
         result.addAll(tableNames);
@@ -67,18 +85,64 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         return result;
     }
     
-    protected void createTargetTable(final Connection targetConnection, final String createTableSQL) throws SQLException {
-        String sql = addIfNotExistsForCreateTableSQL(createTableSQL);
-        log.info("create target table, sql: {}", sql);
+    /**
+     * Get data source table names map.
+     *
+     * @param sourceConfig source data source configuration
+     * @return data source table names map. map(data source, map(first actual table name of logic table, logic table name)).
+     */
+    protected Map<DataSource, Map<String, String>> getDataSourceTableNamesMap(final ScalingDataSourceConfiguration sourceConfig) {
+        ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
+        ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
+        Map<String, DataSourceConfiguration> dataSourceConfigs = JobConfigurationUtil.getDataSourceConfigurations(source.getRootConfig());
+        Map<String, DataSource> dataSourceMap = dataSourceConfigs.entrySet().stream().collect(
+                Collectors.toMap(Entry::getKey, entry -> new DataSourceWrapper(null), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+        ShardingRule shardingRule = new ShardingRule(ruleConfig, dataSourceMap);
+        Collection<String> logicTableNames = getLogicTableNames(ruleConfig);
+        Map<String, Map<String, String>> dataSourceNameTableNamesMap = new HashMap<>();
+        for (String each : logicTableNames) {
+            DataNode dataNode = shardingRule.getDataNode(each);
+            dataSourceNameTableNamesMap.computeIfAbsent(dataNode.getDataSourceName(), key -> new LinkedHashMap<>()).put(dataNode.getTableName(), each);
+        }
+        return dataSourceNameTableNamesMap.entrySet().stream().collect(
+                Collectors.toMap(entry -> DataSourceConverter.getDataSource(dataSourceConfigs.get(entry.getKey())), Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+    }
+    
+    protected void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
+        log.info("execute target table sql: {}", sql);
         try (Statement statement = targetConnection.createStatement()) {
             statement.execute(sql);
         }
     }
     
-    private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
+    protected Collection<String> splitTableDefinitionToSQLs(final ActualTableDefinition actualTableDefinition) {
+        return Arrays.stream(actualTableDefinition.getTableDefinition().split(";")).collect(Collectors.toList());
+    }
+    
+    //TODO simple lexer
+    protected TableDefinitionSQLType getTableDefinitionSQLType(final String sql) {
+        if (PATTERN_CREATE_TABLE.matcher(sql).find()) {
+            return TableDefinitionSQLType.CREATE_TABLE;
+        }
+        if (PATTERN_ALTER_TABLE.matcher(sql).find()) {
+            return TableDefinitionSQLType.ALTER_TABLE;
+        }
+        return TableDefinitionSQLType.UNKNOWN;
+    }
+    
+    protected String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
         if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find()) {
             return createTableSQL;
         }
         return PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
     }
+    
+    protected String replaceActualTableNameToLogicTableName(final String createOrAlterTableSQL, final String actualTableName, final String logicTableName) {
+        int start = createOrAlterTableSQL.indexOf(actualTableName);
+        if (start <= 0) {
+            return createOrAlterTableSQL;
+        }
+        int end = start + actualTableName.length();
+        return new StringBuilder(createOrAlterTableSQL).replace(start, end, logicTableName).toString();
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ActualTableDefinition.java
similarity index 52%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ActualTableDefinition.java
index 86a2f03..fa6227f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ActualTableDefinition.java
@@ -15,26 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.component.checker;
+package org.apache.shardingsphere.scaling.core.job.preparer;
 
-import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 
-public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
+/**
+ * Actual table definition.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class ActualTableDefinition {
     
-    @Override
-    public Class<PostgreSQLDataSourceChecker> getDataSourceCheckerClass() {
-        return PostgreSQLDataSourceChecker.class;
-    }
+    private final String logicTableName;
     
-    @Override
-    public Class<PostgreSQLDataConsistencyChecker> getDataConsistencyCheckerClass() {
-        return PostgreSQLDataConsistencyChecker.class;
-    }
+    private final String actualTableName;
     
-    @Override
-    public Class<? extends DataSourcePreparer> getDataSourcePreparerClass() {
-        //TODO
-        return null;
-    }
+    /**
+     * Plenty of actual table definition SQLs, separated with ';'.
+     * <p/>
+     * May be <code>CREATE TABLE</code>, <code>ALTER TABLE</code>, <code>TABLESPACE</code>, <code>SET search_path</code>, etc.
+     */
+    private final String tableDefinition;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/TableDefinitionSQLType.java
similarity index 51%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/TableDefinitionSQLType.java
index 86a2f03..09b4031 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/TableDefinitionSQLType.java
@@ -15,26 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.component.checker;
+package org.apache.shardingsphere.scaling.core.job.preparer;
 
-import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
-
-public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
-    
-    @Override
-    public Class<PostgreSQLDataSourceChecker> getDataSourceCheckerClass() {
-        return PostgreSQLDataSourceChecker.class;
-    }
-    
-    @Override
-    public Class<PostgreSQLDataConsistencyChecker> getDataConsistencyCheckerClass() {
-        return PostgreSQLDataConsistencyChecker.class;
-    }
+/**
+ * Table definition SQL type.
+ */
+public enum TableDefinitionSQLType {
     
-    @Override
-    public Class<? extends DataSourcePreparer> getDataSourcePreparerClass() {
-        //TODO
-        return null;
-    }
+    UNKNOWN,
+    CREATE_TABLE,
+    ALTER_TABLE,
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
index 2a03616..7454da0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
@@ -222,9 +222,10 @@ public final class JobConfigurationUtil {
         List<TaskConfiguration> result = new LinkedList<>();
         ShardingSphereJDBCDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
         ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
-        Map<String, DataSourceConfiguration> sourceDataSource = getDataSourceConfigurations(sourceConfig.getRootConfig().getDataSources());
+        Map<String, DataSourceConfiguration> sourceDataSource = getDataSourceConfigurations(sourceConfig.getRootConfig());
         Map<String, DataSource> dataSourceMap = DataSourceConverter.getDataSourceMap(sourceDataSource);
         Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(new ShardingRule(sourceRuleConfig, dataSourceMap));
+        closeDataSources(dataSourceMap.values());
         Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfiguration(jobConfig);
         filterByShardingDataSourceTables(dataSourceTableNameMap, jobConfig.getHandleConfig());
         Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
@@ -242,12 +243,32 @@ public final class JobConfigurationUtil {
         return (ShardingSphereJDBCDataSourceConfiguration) result;
     }
     
-    private static Map<String, DataSourceConfiguration> getDataSourceConfigurations(final Map<String, Map<String, Object>> yamlDataSourceConfigs) {
+    /**
+     * Get data source configurations.
+     *
+     * @param yamlRootConfiguration yaml root configuration
+     * @return data source name to data source configuration map
+     */
+    public static Map<String, DataSourceConfiguration> getDataSourceConfigurations(final YamlRootConfiguration yamlRootConfiguration) {
+        Map<String, Map<String, Object>> yamlDataSourceConfigs = yamlRootConfiguration.getDataSources();
         Map<String, DataSourceConfiguration> result = new LinkedHashMap<>(yamlDataSourceConfigs.size());
         yamlDataSourceConfigs.forEach((key, value) -> result.put(key, new YamlDataSourceConfigurationSwapper().swapToDataSourceConfiguration(value)));
         return result;
     }
     
+    private static void closeDataSources(final Collection<DataSource> dataSources) {
+        for (DataSource dataSource : dataSources) {
+            if (dataSource instanceof AutoCloseable) {
+                try {
+                    ((AutoCloseable) dataSource).close();
+                    //CHECKSTYLE:OFF
+                } catch (final Exception ignored) {
+                    //CHECKSTYLE:ON
+                }
+            }
+        }
+    }
+    
     private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final JobConfiguration jobConfig) {
         ScalingDataSourceConfiguration dataSourceConfig = jobConfig.getRuleConfig().getTarget().unwrap();
         if (dataSourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparerTest.java
similarity index 50%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparerTest.java
index b180a4a..fc328c2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparerTest.java
@@ -15,39 +15,59 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.preparer;
+package org.apache.shardingsphere.scaling.core.job.preparer;
 
-import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.job.preparer.AbstractDataSourcePreparer;
 import org.junit.Test;
 
-import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public final class AbstractDataSourcePreparerTest {
     
     private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
     
+    private final AbstractDataSourcePreparer preparer = new AbstractDataSourcePreparer() {
+        @Override
+        public void prepareTargetTables(final JobConfiguration jobConfig) {
+        }
+    };
+    
+    @Test
+    public void assertGetTableDefinitionSQLType() {
+        Collection<Pair<String, TableDefinitionSQLType>> pairs = new ArrayList<>();
+        pairs.add(Pair.of("SET search_path = public", TableDefinitionSQLType.UNKNOWN));
+        pairs.add(Pair.of("CREATE TABLE t1_0 (id int NOT NULL)", TableDefinitionSQLType.CREATE_TABLE));
+        pairs.add(Pair.of("ALTER TABLE t1_0 ADD CONSTRAINT t1_0_pkey PRIMARY KEY (id)", TableDefinitionSQLType.ALTER_TABLE));
+        for (Pair<String, TableDefinitionSQLType> each : pairs) {
+            TableDefinitionSQLType sqlType = preparer.getTableDefinitionSQLType(each.getKey());
+            assertThat(sqlType, is(each.getValue()));
+        }
+    }
+    
     @Test
-    @SneakyThrows(ReflectiveOperationException.class)
     public void assertAddIfNotExistsForCreateTableSQL() {
-        Method method = AbstractDataSourcePreparer.class.getDeclaredMethod("addIfNotExistsForCreateTableSQL", String.class);
-        method.setAccessible(true);
-        AbstractDataSourcePreparer preparer = new AbstractDataSourcePreparer() {
-            @Override
-            public void prepareTargetTables(final JobConfiguration jobConfig) {
-            }
-        };
         List<String> createTableSQLs = Arrays.asList("CREATE TABLE IF NOT EXISTS t (id int)", "CREATE TABLE t (id int)",
                 "CREATE  TABLE IF \nNOT \tEXISTS t (id int)", "CREATE \tTABLE t (id int)");
         for (String createTableSQL : createTableSQLs) {
-            String sql = (String) method.invoke(preparer, createTableSQL);
+            String sql = preparer.addIfNotExistsForCreateTableSQL(createTableSQL);
             assertTrue(PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(sql).find());
         }
     }
+    
+    @Test
+    public void assertReplaceActualTableNameToLogicTableName() {
+        String sql = "ALTER TABLE t_order_0 ADD CONSTRAINT t_order_0_uniq UNIQUE (order_id)";
+        String expected = "ALTER TABLE t_order ADD CONSTRAINT t_order_0_uniq UNIQUE (order_id)";
+        String actual = preparer.replaceActualTableNameToLogicTableName(sql, "t_order_0", "t_order");
+        assertThat(actual, is(expected));
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
index af7f2fe..7a7e1d7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
@@ -28,8 +28,8 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * Data source preparer for MySQL.
@@ -45,10 +45,11 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
              Connection sourceConnection = sourceDataSource.getConnection();
              DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig);
              Connection targetConnection = targetDataSource.getConnection()) {
-            List<String> logicTableNames = getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap());
+            Collection<String> logicTableNames = getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap());
             for (String each : logicTableNames) {
                 String createTableSQL = getCreateTableSQL(sourceConnection, each);
-                createTargetTable(targetConnection, createTableSQL);
+                createTableSQL = addIfNotExistsForCreateTableSQL(createTableSQL);
+                executeTargetTableSQL(targetConnection, createTableSQL);
                 log.info("create target table '{}' success", each);
             }
         } catch (final SQLException ex) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
index 07f54b2..3a6f01d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntry.java
@@ -22,8 +22,8 @@ import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussImporter;
 import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussPositionInitializer;
 import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussScalingSQLBuilder;
 import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussWalDumper;
+import org.apache.shardingsphere.scaling.opengauss.component.checker.OpenGaussEnvironmentChecker;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLInventoryDumper;
-import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLEnvironmentChecker;
 
 /**
  * OpenGauss scaling entry.
@@ -51,8 +51,8 @@ public final class OpenGaussScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<PostgreSQLEnvironmentChecker> getEnvironmentCheckerClass() {
-        return PostgreSQLEnvironmentChecker.class;
+    public Class<OpenGaussEnvironmentChecker> getEnvironmentCheckerClass() {
+        return OpenGaussEnvironmentChecker.class;
     }
     
     @Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
new file mode 100644
index 0000000..f6f791c
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussDataSourcePreparer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component.checker;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceWrapper;
+import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.job.preparer.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.scaling.core.job.preparer.ActualTableDefinition;
+import org.apache.shardingsphere.scaling.core.job.preparer.TableDefinitionSQLType;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Data source preparer for openGauss.
+ */
+@Slf4j
+public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
+    
+    @Override
+    public void prepareTargetTables(final JobConfiguration jobConfig) {
+        Collection<ActualTableDefinition> actualTableDefinitions;
+        try {
+            actualTableDefinitions = getActualTableDefinitions(jobConfig);
+        } catch (final SQLException ex) {
+            throw new PrepareFailedException("get table definitions failed.", ex);
+        }
+        Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
+        try (DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig);
+             Connection targetConnection = targetDataSource.getConnection()) {
+            for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
+                for (String each : entry.getValue()) {
+                    executeTargetTableSQL(targetConnection, each);
+                }
+                log.info("create target table '{}' success", entry.getKey());
+            }
+        } catch (final SQLException ex) {
+            throw new PrepareFailedException("prepare target tables failed.", ex);
+        }
+    }
+    
+    private Collection<ActualTableDefinition> getActualTableDefinitions(final JobConfiguration jobConfig) throws SQLException {
+        Collection<ActualTableDefinition> result = new ArrayList<>();
+        Map<DataSource, Map<String, String>> dataSourceTableNamesMap = getDataSourceTableNamesMap(jobConfig.getRuleConfig().getSource().unwrap());
+        for (Entry<DataSource, Map<String, String>> entry : dataSourceTableNamesMap.entrySet()) {
+            try (DataSourceWrapper dataSource = new DataSourceWrapper(entry.getKey());
+                 Connection sourceConnection = dataSource.getConnection()) {
+                for (Entry<String, String> tableNameEntry : entry.getValue().entrySet()) {
+                    String actualTableName = tableNameEntry.getValue();
+                    int oid = queryTableOid(sourceConnection, actualTableName);
+                    String tableDefinition = queryTableDefinition(sourceConnection, oid);
+                    String logicTableName = tableNameEntry.getKey();
+                    result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition));
+                }
+            }
+        }
+        return result;
+    }
+    
+    private int queryTableOid(final Connection sourceConnection, final String actualTableName) throws SQLException {
+        String sql = "SELECT oid FROM pg_class WHERE relname = ?";
+        try (PreparedStatement statement = sourceConnection.prepareStatement(sql)) {
+            statement.setString(1, actualTableName);
+            try (ResultSet resultSet = statement.executeQuery()) {
+                if (!resultSet.next()) {
+                    throw new PrepareFailedException("select oid has no result, sql: " + sql + ", actualTableName: " + actualTableName);
+                }
+                return resultSet.getInt(1);
+            }
+        }
+    }
+    
+    private String queryTableDefinition(final Connection sourceConnection, final int oid) throws SQLException {
+        String sql = String.format("SELECT * FROM pg_get_tabledef(%d)", oid);
+        try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) {
+            if (!resultSet.next()) {
+                throw new PrepareFailedException("table definition has no result, sql: " + sql);
+            }
+            return resultSet.getString(1);
+        }
+    }
+    
+    /**
+     * Get create logic table SQLs.
+     *
+     * @param actualTableDefinitions actual table definitions. key is logic table name, value is actual table definition.
+     * @return all SQLs. key is logic table name, value is collection of logic table SQLs.
+     */
+    private Map<String, Collection<String>> getCreateLogicTableSQLs(final Collection<ActualTableDefinition> actualTableDefinitions) {
+        Map<String, Collection<String>> result = new HashMap<>();
+        for (ActualTableDefinition each : actualTableDefinitions) {
+            Collection<String> logicTableSQLs = splitTableDefinitionToSQLs(each).stream().map(sql -> {
+                TableDefinitionSQLType sqlType = getTableDefinitionSQLType(sql);
+                switch (sqlType) {
+                    //TODO replace constraint and index name
+                    case CREATE_TABLE:
+                        sql = addIfNotExistsForCreateTableSQL(sql);
+                        sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
+                        return sql;
+                    case ALTER_TABLE:
+                        sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
+                        return sql;
+                    case UNKNOWN:
+                        return sql;
+                    default:
+                        return sql;
+                }
+            }).collect(Collectors.toList());
+            result.put(each.getLogicTableName(), logicTableSQLs);
+        }
+        return result;
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussEnvironmentChecker.java
similarity index 69%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
copy to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussEnvironmentChecker.java
index 86a2f03..a44a1b0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/checker/OpenGaussEnvironmentChecker.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.postgresql.component.checker;
+package org.apache.shardingsphere.scaling.opengauss.component.checker;
 
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataConsistencyChecker;
+import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataSourceChecker;
 
-public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
+/**
+ * Environment checker for openGauss.
+ */
+public final class OpenGaussEnvironmentChecker implements EnvironmentChecker {
     
     @Override
     public Class<PostgreSQLDataSourceChecker> getDataSourceCheckerClass() {
@@ -33,8 +37,7 @@ public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
     }
     
     @Override
-    public Class<? extends DataSourcePreparer> getDataSourcePreparerClass() {
-        //TODO
-        return null;
+    public Class<OpenGaussDataSourcePreparer> getDataSourcePreparerClass() {
+        return OpenGaussDataSourcePreparer.class;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
index cd1a377..777d29d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/OpenGaussScalingEntryTest.java
@@ -22,10 +22,10 @@ import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussImporter;
 import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussPositionInitializer;
 import org.apache.shardingsphere.scaling.opengauss.component.OpenGaussWalDumper;
+import org.apache.shardingsphere.scaling.opengauss.component.checker.OpenGaussEnvironmentChecker;
 import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLInventoryDumper;
 import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataConsistencyChecker;
 import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLDataSourceChecker;
-import org.apache.shardingsphere.scaling.postgresql.component.checker.PostgreSQLEnvironmentChecker;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -39,7 +39,7 @@ public final class OpenGaussScalingEntryTest {
         ScalingEntry scalingEntry = ScalingEntryLoader.getInstance("openGauss");
         assertTrue(scalingEntry instanceof OpenGaussScalingEntry);
         assertThat(scalingEntry.getPositionInitializerClass(), equalTo(OpenGaussPositionInitializer.class));
-        assertThat(scalingEntry.getEnvironmentCheckerClass(), equalTo(PostgreSQLEnvironmentChecker.class));
+        assertThat(scalingEntry.getEnvironmentCheckerClass(), equalTo(OpenGaussEnvironmentChecker.class));
         assertThat(scalingEntry.getEnvironmentCheckerClass().newInstance().getDataSourceCheckerClass(), equalTo(PostgreSQLDataSourceChecker.class));
         assertThat(scalingEntry.getEnvironmentCheckerClass().newInstance().getDataConsistencyCheckerClass(), equalTo(PostgreSQLDataConsistencyChecker.class));
         assertThat(scalingEntry.getImporterClass(), equalTo(OpenGaussImporter.class));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
index 86a2f03..d1acc7d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/checker/PostgreSQLEnvironmentChecker.java
@@ -20,6 +20,9 @@ package org.apache.shardingsphere.scaling.postgresql.component.checker;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
 import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
 
+/**
+ * Environment checker for PostgreSQL.
+ */
 public final class PostgreSQLEnvironmentChecker implements EnvironmentChecker {
     
     @Override