You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/27 09:11:32 UTC
[shardingsphere] branch master updated: Remove TaskConfiguration reference from PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new da9e563facc Remove TaskConfiguration reference from PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)
da9e563facc is described below
commit da9e563faccf2344ed250ca64a3caea783c95ebb
Author: Da Xiang Huang <lo...@foxmail.com>
AuthorDate: Wed Jul 27 17:11:25 2022 +0800
Remove TaskConfiguration reference from PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)
---
.../datasource/AbstractDataSourcePreparer.java | 35 ++++++++-------------
.../datasource/PrepareTargetSchemasParameter.java | 12 ++++++--
.../datasource/PrepareTargetTablesParameter.java | 26 ++++++----------
.../rulealtered/RuleAlteredJobPreparer.java | 36 +++++++++++++++-------
.../datasource/MySQLDataSourcePreparer.java | 4 +--
.../datasource/MySQLDataSourcePreparerTest.java | 2 +-
.../datasource/OpenGaussDataSourcePreparer.java | 4 +--
.../datasource/PostgreSQLDataSourcePreparer.java | 4 +--
8 files changed, 65 insertions(+), 58 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index abf908c50ac..a5bde33ac74 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,25 +17,22 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
/**
* Abstract data source preparer.
@@ -51,17 +48,11 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
@Override
public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) {
- DatabaseType sourceDatabaseType = DatabaseTypeFactory.getInstance(parameter.getTaskConfig().getJobConfig().getSourceDatabaseType());
- DatabaseType targetDatabaseType = DatabaseTypeFactory.getInstance(parameter.getTaskConfig().getJobConfig().getTargetDatabaseType());
- if (!sourceDatabaseType.isSchemaAvailable() || !targetDatabaseType.isSchemaAvailable()) {
- log.info("prepareTargetSchemas, one of source or target database type schema is not available, ignore");
- return;
- }
Set<String> schemaNames = getSchemaNames(parameter);
- String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType, parameter.getTaskConfig().getJobConfig().getDatabaseName());
+ String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), parameter.getDatabaseName());
log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", schemaNames, defaultSchema);
- PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
- try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+ PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
for (String each : schemaNames) {
if (each.equalsIgnoreCase(defaultSchema)) {
continue;
@@ -80,7 +71,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private Set<String> getSchemaNames(final PrepareTargetSchemasParameter parameter) {
Set<String> result = new HashSet<>();
- for (String each : parameter.getTaskConfig().getJobConfig().splitLogicTableNames()) {
+ for (String each : parameter.getLogicTableNames()) {
String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each);
if (null == schemaName) {
throw new PipelineJobPrepareFailedException("Can not get schemaName by logic table name " + each);
@@ -95,8 +86,8 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
}
- protected final PipelineDataSourceWrapper getTargetCachedDataSource(final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
- return dataSourceManager.getDataSource(taskConfig.getImporterConfig().getDataSourceConfig());
+ protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
+ return dataSourceManager.getDataSource(dataSourceConfig);
}
protected final void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index b19cd0afd7b..5c4a4b382bc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -17,11 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+import java.util.List;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
/**
* Prepare target schemas parameter.
@@ -30,7 +32,13 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
@Getter
public final class PrepareTargetSchemasParameter {
- private final TaskConfiguration taskConfig;
+ private final List<String> logicTableNames;
+
+ private final DatabaseType targetDatabaseType;
+
+ private final String databaseName;
+
+ private final PipelineDataSourceConfiguration dataSourceConfig;
private final PipelineDataSourceManager dataSourceManager;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index d49cba24444..cf399464508 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -20,9 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
import lombok.NonNull;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
/**
@@ -31,28 +30,23 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
@Getter
public final class PrepareTargetTablesParameter {
- private final TaskConfiguration taskConfig;
+ private final String databaseName;
private final JobDataNodeLine tablesFirstDataNodes;
+ private final PipelineDataSourceConfiguration dataSourceConfig;
+
private final PipelineDataSourceManager dataSourceManager;
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public PrepareTargetTablesParameter(@NonNull final TaskConfiguration taskConfig, @NonNull final PipelineDataSourceManager dataSourceManager,
- final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- this.taskConfig = taskConfig;
- tablesFirstDataNodes = JobDataNodeLine.unmarshal(taskConfig.getJobConfig().getTablesFirstDataNodes());
+ public PrepareTargetTablesParameter(@NonNull final String databaseName, @NonNull final PipelineDataSourceConfiguration dataSourceConfig,
+ @NonNull final PipelineDataSourceManager dataSourceManager,
+ @NonNull final String tablesFirstDataNodes, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ this.databaseName = databaseName;
+ this.dataSourceConfig = dataSourceConfig;
+ this.tablesFirstDataNodes = JobDataNodeLine.unmarshal(tablesFirstDataNodes);
this.dataSourceManager = dataSourceManager;
this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
}
-
- /**
- * Get job configuration.
- *
- * @return job configuration
- */
- public RuleAlteredJobConfiguration getJobConfig() {
- return taskConfig.getJobConfig();
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6456409cb22..288d1cf0b85 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -17,6 +17,13 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
@@ -55,14 +62,6 @@ import org.apache.shardingsphere.infra.lock.LockScope;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
/**
* Rule altered job preparer.
*/
@@ -143,11 +142,26 @@ public final class RuleAlteredJobPreparer {
return;
}
TableNameSchemaNameMapping tableNameSchemaNameMapping = jobContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
- PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(jobContext.getTaskConfig(), jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
- dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
- PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobContext.getTaskConfig(), jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+ PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
+ DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()),
+ jobConfig.getDatabaseName(), jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+ if (isSourceAndTargetSchemaAvailable(jobConfig)) {
+ dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+ }
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getDatabaseName(), jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
+ jobContext.getDataSourceManager(), jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
}
+
+ private boolean isSourceAndTargetSchemaAvailable(final RuleAlteredJobConfiguration jobConfig) {
+ DatabaseType sourceDatabaseType = DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
+ DatabaseType targetDatabaseType = DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType());
+ if (!sourceDatabaseType.isSchemaAvailable() || !targetDatabaseType.isSchemaAvailable()) {
+ log.info("prepareTargetSchemas, one of source or target database type schema is not available, ignore");
+ return false;
+ }
+ return true;
+ }
private void checkSourceDataSource(final RuleAlteredJobContext jobContext) {
DataSourceChecker dataSourceChecker = DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getSourceDatabaseType());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 54d531fdd26..5c282fbfe0d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -41,7 +41,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
- try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), dataSourceManager).getConnection()) {
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), dataSourceManager).getConnection()) {
for (String each : getCreateTableSQL(parameter)) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
log.info("create target table '{}' success", each);
@@ -56,7 +56,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
List<String> result = new LinkedList<>();
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+ result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 140528e89bd..98c6d100a99 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -85,7 +85,7 @@ public final class MySQLDataSourcePreparerTest {
when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
when(jobConfig.getTarget().getParameter()).thenReturn("target");
- when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
+ when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList()));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index 32f635aba91..5ba5ff93801 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -43,7 +43,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
for (String createLogicalTableSQL : createLogicalTableSQLs) {
for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
@@ -59,7 +59,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
List<String> result = new LinkedList<>();
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- result.add(generator.generateLogicDDLSQL(new OpenGaussDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+ result.add(generator.generateLogicDDLSQL(new OpenGaussDatabaseType(), parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index d579ca04585..bc5f72469f6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -43,7 +43,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
for (String createLogicalTableSQL : createLogicalTableSQLs) {
for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
executeTargetTableSQL(targetConnection, each);
@@ -59,7 +59,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
List<String> result = new LinkedList<>();
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- result.add(generator.generateLogicDDLSQL(new PostgreSQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+ result.add(generator.generateLogicDDLSQL(new PostgreSQLDatabaseType(), parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
}
return result;
}