You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/03 05:17:54 UTC
[shardingsphere] branch master updated: Refactor PipelineSQLBuilder.buildCreateSchemaSQL (#21325)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ef82d4c82d5 Refactor PipelineSQLBuilder.buildCreateSchemaSQL (#21325)
ef82d4c82d5 is described below
commit ef82d4c82d5b8c9374c529c8dd6cf4c2a3e889a7
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Oct 3 13:17:47 2022 +0800
Refactor PipelineSQLBuilder.buildCreateSchemaSQL (#21325)
---
.../spi/sqlbuilder/PipelineSQLBuilder.java | 5 ++-
.../datasource/AbstractDataSourcePreparer.java | 38 +++++++++++-----------
.../core/sqlbuilder/OraclePipelineSQLBuilder.java | 6 ----
.../datasource/MySQLDataSourcePreparer.java | 4 +--
.../datasource/OpenGaussDataSourcePreparer.java | 4 +--
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 5 +--
.../datasource/PostgreSQLDataSourcePreparer.java | 4 +--
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 5 +--
.../datasource/AbstractDataSourcePreparerTest.java | 10 ------
9 files changed, 30 insertions(+), 51 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index ad2f8365d7e..13bc928224f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -41,8 +40,8 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
* @param schemaName schema name
* @return create schema SQL
*/
- default String buildCreateSchemaSQL(String schemaName) {
- throw new UnsupportedSQLOperationException("buildCreateSchemaSQL");
+ default Optional<String> buildCreateSchemaSQL(String schemaName) {
+ return Optional.empty();
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 9dcf18e2a8d..32baf21e13d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -34,8 +34,9 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
import java.util.HashSet;
-import java.util.Set;
+import java.util.Optional;
import java.util.regex.Pattern;
/**
@@ -59,33 +60,32 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
}
CreateTableConfiguration createTableConfig = parameter.getCreateTableConfig();
String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
- PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
- Set<String> createdSchemaNames = new HashSet<>();
+ PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
+ Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) {
String targetSchemaName = each.getTargetName().getSchemaName().getOriginal();
- if (null == targetSchemaName) {
+ if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
continue;
}
- if (targetSchemaName.equalsIgnoreCase(defaultSchema)) {
- continue;
- }
- if (createdSchemaNames.contains(targetSchemaName)) {
- continue;
- }
- try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
- String sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
- log.info("prepareTargetSchemas, sql={}", sql);
- try (Statement statement = targetConnection.createStatement()) {
- statement.execute(sql);
- createdSchemaNames.add(targetSchemaName);
- } catch (final SQLException ignored) {
- }
+ Optional<String> sql = sqlBuilder.buildCreateSchemaSQL(targetSchemaName);
+ if (sql.isPresent()) {
+ executeCreateSchema(parameter.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
+ createdSchemaNames.add(targetSchemaName);
}
}
log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
}
- protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
+ private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
+ log.info("prepareTargetSchemas, sql={}", sql);
+ try (Connection connection = getCachedDataSource(dataSourceManager, targetDataSourceConfig).getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
+ }
+ }
+
+ protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration dataSourceConfig) {
return dataSourceManager.getDataSource(dataSourceConfig);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
index 871d81e400e..bbe8516cad4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineJobUniqueKeyDataTypeException;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import java.util.Map;
import java.util.Set;
@@ -32,11 +31,6 @@ import java.util.Set;
*/
public final class OraclePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- @Override
- public String buildCreateSchemaSQL(final String schemaName) {
- throw new UnsupportedSQLOperationException("buildCreateSchemaSQL");
- }
-
@Override
public String buildInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 9b7f0d2cde0..489775e35ef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
-import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
@@ -29,7 +28,6 @@ import java.sql.SQLException;
/**
* Data source preparer for MySQL.
*/
-@Slf4j
public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
@@ -37,7 +35,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
- try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+ try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index 78b6fef6559..bbba9d34795 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
import com.google.common.base.Splitter;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -32,7 +31,6 @@ import java.util.stream.Collectors;
/**
* Data source preparer for openGauss.
*/
-@Slf4j
public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
@@ -40,7 +38,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
- try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+ try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
for (String sql : Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
executeTargetTableSQL(targetConnection, sql);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index a6de251311a..cadb19b69c5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineS
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -33,8 +34,8 @@ import java.util.stream.Collectors;
public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
@Override
- public String buildCreateSchemaSQL(final String schemaName) {
- return String.format("CREATE SCHEMA %s", quote(schemaName));
+ public Optional<String> buildCreateSchemaSQL(final String schemaName) {
+ return Optional.of(String.format("CREATE SCHEMA %s", quote(schemaName)));
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index aa30e1c380c..b56d0028592 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
import com.google.common.base.Splitter;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -32,7 +31,6 @@ import java.util.stream.Collectors;
/**
* Data source preparer for PostgresSQL.
*/
-@Slf4j
public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
@@ -40,7 +38,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
- try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+ try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) {
for (String sql : Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
executeTargetTableSQL(targetConnection, sql);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 8eac089286d..828551fa538 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
/**
@@ -32,8 +33,8 @@ import java.util.Set;
public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
@Override
- public String buildCreateSchemaSQL(final String schemaName) {
- return String.format("CREATE SCHEMA IF NOT EXISTS %s", quote(schemaName));
+ public Optional<String> buildCreateSchemaSQL(final String schemaName) {
+ return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", quote(schemaName)));
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
index 5ae0d06de3e..a06dccc897b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.junit.Test;
import java.sql.Connection;
@@ -46,14 +44,6 @@ public final class AbstractDataSourcePreparerTest {
}
};
- @Test
- public void assertGetCachedDataSource() {
- PipelineDataSourceConfiguration dataSourceConfig = mock(PipelineDataSourceConfiguration.class);
- PipelineDataSourceManager dataSourceManager = mock(PipelineDataSourceManager.class);
- preparer.getCachedDataSource(dataSourceConfig, dataSourceManager);
- verify(dataSourceManager).getDataSource(dataSourceConfig);
- }
-
@Test
public void assertExecuteTargetTableSQL() throws SQLException {
Statement statement = mock(Statement.class);