You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/09/16 03:35:23 UTC

[shardingsphere] branch master updated: Scaling prepare tables part 2 : Add AbstractDataSourcePreparer and add IF NOT EXISTS for create table SQL (#12459)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new aad707d  Scaling prepare tables part 2 : Add AbstractDataSourcePreparer and add IF NOT EXISTS for create table SQL (#12459)
aad707d is described below

commit aad707d723e52983c41ad653555094dc36d4eb35
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Sep 16 11:34:49 2021 +0800

    Scaling prepare tables part 2 : Add AbstractDataSourcePreparer and add IF NOT EXISTS for create table SQL (#12459)
    
    * Extract AbstractDataSourcePreparer
    
    * Add if not exists for create table sql
---
 .../job/preparer/AbstractDataSourcePreparer.java}  | 59 ++++++++--------------
 .../preparer/AbstractDataSourcePreparerTest.java   | 53 +++++++++++++++++++
 .../component/checker/MySQLDataSourcePreparer.java | 49 ++++--------------
 3 files changed, 84 insertions(+), 77 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
similarity index 50%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index 96949dc..379fc3b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -15,60 +15,48 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.mysql.component.checker;
+package org.apache.shardingsphere.scaling.core.job.preparer;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceWrapper;
-import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.yaml.ShardingRuleConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
-import org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
 import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
 
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
- * Data source preparer for MySQL.
+ * Abstract data source preparer.
  */
 @Slf4j
-public final class MySQLDataSourcePreparer implements DataSourcePreparer {
+public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
+    
+    private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
+    
+    private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
     
     private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
     
-    private final MySQLScalingSQLBuilder scalingSQLBuilder = new MySQLScalingSQLBuilder(Collections.emptyMap());
+    protected DataSourceWrapper getSourceDataSource(final JobConfiguration jobConfig) {
+        return dataSourceFactory.newInstance(jobConfig.getRuleConfig().getSource().unwrap());
+    }
     
-    @Override
-    public void prepareTargetTables(final JobConfiguration jobConfig) {
-        ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
-        ScalingDataSourceConfiguration targetConfig = jobConfig.getRuleConfig().getTarget().unwrap();
-        try (DataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceConfig);
-             Connection sourceConnection = sourceDataSource.getConnection();
-             DataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetConfig);
-             Connection targetConnection = targetDataSource.getConnection()) {
-            List<String> logicTableNames = getLogicTableNames(sourceConfig);
-            for (String logicTableName : logicTableNames) {
-                createTargetTable(sourceConnection, targetConnection, logicTableName);
-                log.info("create target table '{}' success", logicTableName);
-            }
-        } catch (final SQLException ex) {
-            throw new PrepareFailedException("prepare target tables failed.", ex);
-        }
+    protected DataSourceWrapper getTargetDataSource(final JobConfiguration jobConfig) {
+        return dataSourceFactory.newInstance(jobConfig.getRuleConfig().getTarget().unwrap());
     }
     
-    private List<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
+    protected List<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
         List<String> result = new ArrayList<>();
         ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
         ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
@@ -79,21 +67,18 @@ public final class MySQLDataSourcePreparer implements DataSourcePreparer {
         return result;
     }
     
-    private void createTargetTable(final Connection sourceConnection, final Connection targetConnection, final String logicTableName) throws SQLException {
-        String createTableSQL = getCreateTableSQL(sourceConnection, logicTableName);
-        log.info("logicTableName: {}, createTableSQL: {}", logicTableName, createTableSQL);
+    protected void createTargetTable(final Connection targetConnection, final String createTableSQL) throws SQLException {
+        String sql = addIfNotExistsForCreateTableSQL(createTableSQL);
+        log.info("create target table, sql: {}", sql);
         try (Statement statement = targetConnection.createStatement()) {
-            statement.execute(createTableSQL);
+            statement.execute(sql);
         }
     }
     
-    private String getCreateTableSQL(final Connection sourceConnection, final String logicTableName) throws SQLException {
-        String showCreateTableSQL = "SHOW CREATE TABLE " + scalingSQLBuilder.quote(logicTableName);
-        try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
-            if (!resultSet.next()) {
-                throw new PrepareFailedException("show create table has no result, sql: " + showCreateTableSQL);
-            }
-            return resultSet.getString(2);
+    private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
+        if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find()) {
+            return createTableSQL;
         }
+        return PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java
new file mode 100644
index 0000000..b180a4a
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/preparer/AbstractDataSourcePreparerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.preparer;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.job.preparer.AbstractDataSourcePreparer;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+public final class AbstractDataSourcePreparerTest {
+    
+    private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
+    
+    @Test
+    @SneakyThrows(ReflectiveOperationException.class)
+    public void assertAddIfNotExistsForCreateTableSQL() {
+        Method method = AbstractDataSourcePreparer.class.getDeclaredMethod("addIfNotExistsForCreateTableSQL", String.class);
+        method.setAccessible(true);
+        AbstractDataSourcePreparer preparer = new AbstractDataSourcePreparer() {
+            @Override
+            public void prepareTargetTables(final JobConfiguration jobConfig) {
+            }
+        };
+        List<String> createTableSQLs = Arrays.asList("CREATE TABLE IF NOT EXISTS t (id int)", "CREATE TABLE t (id int)",
+                "CREATE  TABLE IF \nNOT \tEXISTS t (id int)", "CREATE \tTABLE t (id int)");
+        for (String createTableSQL : createTableSQLs) {
+            String sql = (String) method.invoke(preparer, createTableSQL);
+            assertTrue(PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(sql).find());
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
index 96949dc..af7f2fe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/checker/MySQLDataSourcePreparer.java
@@ -18,75 +18,44 @@
 package org.apache.shardingsphere.scaling.mysql.component.checker;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceWrapper;
 import org.apache.shardingsphere.scaling.core.common.exception.PrepareFailedException;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.yaml.ShardingRuleConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.job.preparer.DataSourcePreparer;
+import org.apache.shardingsphere.scaling.core.job.preparer.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Data source preparer for MySQL.
  */
 @Slf4j
-public final class MySQLDataSourcePreparer implements DataSourcePreparer {
-    
-    private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
+public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
     private final MySQLScalingSQLBuilder scalingSQLBuilder = new MySQLScalingSQLBuilder(Collections.emptyMap());
     
     @Override
     public void prepareTargetTables(final JobConfiguration jobConfig) {
-        ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
-        ScalingDataSourceConfiguration targetConfig = jobConfig.getRuleConfig().getTarget().unwrap();
-        try (DataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceConfig);
+        try (DataSourceWrapper sourceDataSource = getSourceDataSource(jobConfig);
              Connection sourceConnection = sourceDataSource.getConnection();
-             DataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetConfig);
+             DataSourceWrapper targetDataSource = getTargetDataSource(jobConfig);
              Connection targetConnection = targetDataSource.getConnection()) {
-            List<String> logicTableNames = getLogicTableNames(sourceConfig);
-            for (String logicTableName : logicTableNames) {
-                createTargetTable(sourceConnection, targetConnection, logicTableName);
-                log.info("create target table '{}' success", logicTableName);
+            List<String> logicTableNames = getLogicTableNames(jobConfig.getRuleConfig().getSource().unwrap());
+            for (String each : logicTableNames) {
+                String createTableSQL = getCreateTableSQL(sourceConnection, each);
+                createTargetTable(targetConnection, createTableSQL);
+                log.info("create target table '{}' success", each);
             }
         } catch (final SQLException ex) {
             throw new PrepareFailedException("prepare target tables failed.", ex);
         }
     }
     
-    private List<String> getLogicTableNames(final ScalingDataSourceConfiguration sourceConfig) {
-        List<String> result = new ArrayList<>();
-        ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
-        ShardingRuleConfiguration ruleConfig = ShardingRuleConfigurationSwapper.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
-        List<String> tableNames = ruleConfig.getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
-        List<String> autoTableNames = ruleConfig.getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList());
-        result.addAll(tableNames);
-        result.addAll(autoTableNames);
-        return result;
-    }
-    
-    private void createTargetTable(final Connection sourceConnection, final Connection targetConnection, final String logicTableName) throws SQLException {
-        String createTableSQL = getCreateTableSQL(sourceConnection, logicTableName);
-        log.info("logicTableName: {}, createTableSQL: {}", logicTableName, createTableSQL);
-        try (Statement statement = targetConnection.createStatement()) {
-            statement.execute(createTableSQL);
-        }
-    }
-    
     private String getCreateTableSQL(final Connection sourceConnection, final String logicTableName) throws SQLException {
         String showCreateTableSQL = "SHOW CREATE TABLE " + scalingSQLBuilder.quote(logicTableName);
         try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {