You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/27 09:11:32 UTC

[shardingsphere] branch master updated: Remove TaskConfiguration reference from PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new da9e563facc Remove TaskConfiguration reference from PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)
da9e563facc is described below

commit da9e563faccf2344ed250ca64a3caea783c95ebb
Author: Da Xiang Huang <lo...@foxmail.com>
AuthorDate: Wed Jul 27 17:11:25 2022 +0800

    Remove TaskConfiguration reference from PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)
---
 .../datasource/AbstractDataSourcePreparer.java     | 35 ++++++++-------------
 .../datasource/PrepareTargetSchemasParameter.java  | 12 ++++++--
 .../datasource/PrepareTargetTablesParameter.java   | 26 ++++++----------
 .../rulealtered/RuleAlteredJobPreparer.java        | 36 +++++++++++++++-------
 .../datasource/MySQLDataSourcePreparer.java        |  4 +--
 .../datasource/MySQLDataSourcePreparerTest.java    |  2 +-
 .../datasource/OpenGaussDataSourcePreparer.java    |  4 +--
 .../datasource/PostgreSQLDataSourcePreparer.java   |  4 +--
 8 files changed, 65 insertions(+), 58 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index abf908c50ac..a5bde33ac74 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,25 +17,22 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
 
 /**
  * Abstract data source preparer.
@@ -51,17 +48,11 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     
     @Override
     public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) {
-        DatabaseType sourceDatabaseType = DatabaseTypeFactory.getInstance(parameter.getTaskConfig().getJobConfig().getSourceDatabaseType());
-        DatabaseType targetDatabaseType = DatabaseTypeFactory.getInstance(parameter.getTaskConfig().getJobConfig().getTargetDatabaseType());
-        if (!sourceDatabaseType.isSchemaAvailable() || !targetDatabaseType.isSchemaAvailable()) {
-            log.info("prepareTargetSchemas, one of source or target database type schema is not available, ignore");
-            return;
-        }
         Set<String> schemaNames = getSchemaNames(parameter);
-        String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType, parameter.getTaskConfig().getJobConfig().getDatabaseName());
+        String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), parameter.getDatabaseName());
         log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", schemaNames, defaultSchema);
-        PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
-        try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+        PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
             for (String each : schemaNames) {
                 if (each.equalsIgnoreCase(defaultSchema)) {
                     continue;
@@ -80,7 +71,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     
     private Set<String> getSchemaNames(final PrepareTargetSchemasParameter parameter) {
         Set<String> result = new HashSet<>();
-        for (String each : parameter.getTaskConfig().getJobConfig().splitLogicTableNames()) {
+        for (String each : parameter.getLogicTableNames()) {
             String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each);
             if (null == schemaName) {
                 throw new PipelineJobPrepareFailedException("Can not get schemaName by logic table name " + each);
@@ -95,8 +86,8 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
     }
     
-    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
-        return dataSourceManager.getDataSource(taskConfig.getImporterConfig().getDataSourceConfig());
+    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
+        return dataSourceManager.getDataSource(dataSourceConfig);
     }
     
     protected final void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index b19cd0afd7b..5c4a4b382bc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -17,11 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
+import java.util.List;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 /**
  * Prepare target schemas parameter.
@@ -30,7 +32,13 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 @Getter
 public final class PrepareTargetSchemasParameter {
     
-    private final TaskConfiguration taskConfig;
+    private final List<String> logicTableNames;
+    
+    private final DatabaseType targetDatabaseType;
+    
+    private final String databaseName;
+    
+    private final PipelineDataSourceConfiguration dataSourceConfig;
     
     private final PipelineDataSourceManager dataSourceManager;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index d49cba24444..cf399464508 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -20,9 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 import lombok.Getter;
 import lombok.NonNull;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 
 /**
@@ -31,28 +30,23 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 @Getter
 public final class PrepareTargetTablesParameter {
     
-    private final TaskConfiguration taskConfig;
+    private final String databaseName;
     
     private final JobDataNodeLine tablesFirstDataNodes;
     
+    private final PipelineDataSourceConfiguration dataSourceConfig;
+    
     private final PipelineDataSourceManager dataSourceManager;
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public PrepareTargetTablesParameter(@NonNull final TaskConfiguration taskConfig, @NonNull final PipelineDataSourceManager dataSourceManager,
-                                        final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
-        this.taskConfig = taskConfig;
-        tablesFirstDataNodes = JobDataNodeLine.unmarshal(taskConfig.getJobConfig().getTablesFirstDataNodes());
+    public PrepareTargetTablesParameter(@NonNull final String databaseName, @NonNull final PipelineDataSourceConfiguration dataSourceConfig,
+                                        @NonNull final PipelineDataSourceManager dataSourceManager,
+                                        @NonNull final String tablesFirstDataNodes, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        this.databaseName = databaseName;
+        this.dataSourceConfig = dataSourceConfig;
+        this.tablesFirstDataNodes = JobDataNodeLine.unmarshal(tablesFirstDataNodes);
         this.dataSourceManager = dataSourceManager;
         this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
     }
-    
-    /**
-     * Get job configuration.
-     *
-     * @return job configuration
-     */
-    public RuleAlteredJobConfiguration getJobConfig() {
-        return taskConfig.getJobConfig();
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6456409cb22..288d1cf0b85 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -17,6 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
@@ -55,14 +62,6 @@ import org.apache.shardingsphere.infra.lock.LockScope;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Rule altered job preparer.
  */
@@ -143,11 +142,26 @@ public final class RuleAlteredJobPreparer {
             return;
         }
         TableNameSchemaNameMapping tableNameSchemaNameMapping = jobContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
-        PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(jobContext.getTaskConfig(), jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
-        dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobContext.getTaskConfig(), jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+        PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
+                DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()),
+                jobConfig.getDatabaseName(), jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+        if (isSourceAndTargetSchemaAvailable(jobConfig)) {
+            dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+        }
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getDatabaseName(), jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
+                jobContext.getDataSourceManager(), jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
         dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
     }
+
+    private boolean isSourceAndTargetSchemaAvailable(final RuleAlteredJobConfiguration jobConfig) {
+        DatabaseType sourceDatabaseType = DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
+        DatabaseType targetDatabaseType = DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType());
+        if (!sourceDatabaseType.isSchemaAvailable() || !targetDatabaseType.isSchemaAvailable()) {
+            log.info("prepareTargetSchemas, one of source or target database type schema is not available, ignore");
+            return false;
+        }
+        return true;
+    }
     
     private void checkSourceDataSource(final RuleAlteredJobContext jobContext) {
         DataSourceChecker dataSourceChecker = DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getSourceDatabaseType());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 54d531fdd26..5c282fbfe0d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -41,7 +41,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
         PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
-        try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), dataSourceManager).getConnection()) {
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), dataSourceManager).getConnection()) {
             for (String each : getCreateTableSQL(parameter)) {
                 executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
                 log.info("create target table '{}' success", each);
@@ -56,7 +56,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 140528e89bd..98c6d100a99 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -85,7 +85,7 @@ public final class MySQLDataSourcePreparerTest {
         when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
         when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getTarget().getParameter()).thenReturn("target");
-        when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
+        when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
         when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList()));
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index 32f635aba91..5ba5ff93801 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -43,7 +43,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
         List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
             for (String createLogicalTableSQL : createLogicalTableSQLs) {
                 for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
                     executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
@@ -59,7 +59,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(new OpenGaussDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new OpenGaussDatabaseType(), parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index d579ca04585..bc5f72469f6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -43,7 +43,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
         List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
             for (String createLogicalTableSQL : createLogicalTableSQLs) {
                 for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
                     executeTargetTableSQL(targetConnection, each);
@@ -59,7 +59,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(new PostgreSQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new PostgreSQLDatabaseType(), parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }