You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/01/28 02:21:33 UTC
[shardingsphere] branch master updated: Replace dataSourceFactory.newInstance to dataSourceManager.getDataSource for better performance and avoid possible connection leak (#15126)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f152d01 Replace dataSourceFactory.newInstance to dataSourceManager.getDataSource for better performance and avoid possible connection leak (#15126)
f152d01 is described below
commit f152d010d2e752308efc52664ddfe1f5f8248e54
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Jan 28 10:19:09 2022 +0800
Replace dataSourceFactory.newInstance to dataSourceManager.getDataSource for better performance and avoid possible connection leak (#15126)
* Refactor resetTargetTable
* Remove dataSourceFactory.newInstance in AbstractSingleTableDataCalculator
* Reuse dataSourceManager in incremental dumper, make sure data source will be released later
* Reuse dataSourceManager in WalEventConverter, make sure data source will be released later
* Move DataSourcePreparer
* Reuse dataSourceManager in DataSourcePreparer
* Refactor dataSourceManager for job preparer
---
.../core/api/impl/RuleAlteredJobAPIImpl.java | 2 +-
.../consistency/DataConsistencyCheckerImpl.java | 3 +--
.../core/datasource/PipelineDataSourceFactory.java | 1 -
.../core/datasource/PipelineDataSourceManager.java | 26 ++++++++++++++--------
.../ingest/dumper/AbstractIncrementalDumper.java | 4 +++-
.../datasource/AbstractDataSourcePreparer.java | 13 +++++------
.../prepare/datasource}/DataSourcePreparer.java | 4 +---
.../datasource/PrepareTargetTablesParameter.java | 6 ++++-
.../AbstractSingleTableDataCalculator.java | 9 --------
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
.../rulealtered/RuleAlteredJobPreparer.java | 10 ++++-----
.../scaling/core/job/check/EnvironmentChecker.java | 2 +-
.../core/job/check/EnvironmentCheckerFactory.java | 2 +-
.../scaling/core/job/dumper/DumperFactory.java | 9 +++++---
.../job/environment/ScalingEnvironmentManager.java | 14 ++++++------
.../pipeline/mysql/MySQLEnvironmentChecker.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumper.java | 9 ++++----
.../datasource/MySQLDataSourcePreparer.java | 11 +++++----
.../mysql/ingest/MySQLIncrementalDumperTest.java | 2 +-
.../datasource/MySQLDataSourcePreparerTest.java | 2 +-
.../opengauss/ingest/OpenGaussWalDumper.java | 12 +++++-----
.../datasource/OpenGaussDataSourcePreparer.java | 5 ++---
.../postgresql/PostgreSQLEnvironmentChecker.java | 2 +-
.../postgresql/ingest/PostgreSQLWalDumper.java | 8 ++++---
.../postgresql/ingest/wal/WalEventConverter.java | 6 ++---
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 3 ++-
.../ingest/wal/WalEventConverterTest.java | 2 +-
.../config/rulealtered/HandleConfiguration.java | 11 +++++++++
.../core/fixture/FixtureDataSourcePreparer.java | 4 ++--
.../core/fixture/FixtureEnvironmentChecker.java | 2 +-
.../core/fixture/FixtureIncrementalDumper.java | 6 +++--
.../datasource/AbstractDataSourcePreparerTest.java | 1 -
32 files changed, 106 insertions(+), 89 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 4655a4f..9854efa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -322,7 +322,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
log.info("Scaling job {} reset target table", jobId);
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJobProgress(jobId);
try {
- new ScalingEnvironmentManager().resetTargetTable(new RuleAlteredJobContext(getJobConfig(jobId)));
+ new ScalingEnvironmentManager().cleanupTargetTables(getJobConfig(jobId));
} catch (final SQLException ex) {
throw new PipelineJobExecutionException("Reset target table failed for job " + jobId);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 9a5693c..0e83983 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
@@ -84,7 +83,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
this.jobConfig = jobConfig;
ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
jobId = jobConfig.getHandleConfig().getJobId();
- logicTableNames = Splitter.on(',').splitToList(jobConfig.getHandleConfig().getLogicTables());
+ logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
index bf4f77d..2109afc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
@@ -38,7 +38,6 @@ public final class PipelineDataSourceFactory {
*/
@SneakyThrows(SQLException.class)
public PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration dataSourceConfig) {
- // TODO cache and reuse, try PipelineDataSourceManager
DataSource pipelineDataSource = PipelineDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createPipelineDataSource(dataSourceConfig.getDataSourceConfiguration());
return new PipelineDataSourceWrapper(pipelineDataSource, dataSourceConfig.getDatabaseType());
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index 38220ed..aa3f9ce 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -49,7 +49,10 @@ public final class PipelineDataSourceManager implements AutoCloseable {
* @param pipelineDataSourceConfig pipeline data source configuration
*/
public void createSourceDataSource(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
- PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(pipelineDataSourceConfig);
+ if (cachedDataSources.containsKey(pipelineDataSourceConfig)) {
+ return;
+ }
+ PipelineDataSourceWrapper dataSource = getDataSource(pipelineDataSourceConfig);
cachedDataSources.put(pipelineDataSourceConfig, dataSource);
sourceDataSources.put(pipelineDataSourceConfig, dataSource);
}
@@ -60,27 +63,31 @@ public final class PipelineDataSourceManager implements AutoCloseable {
* @param pipelineDataSourceConfig pipeline data source configuration
*/
public void createTargetDataSource(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
- PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(pipelineDataSourceConfig);
+ if (cachedDataSources.containsKey(pipelineDataSourceConfig)) {
+ return;
+ }
+ PipelineDataSourceWrapper dataSource = getDataSource(pipelineDataSourceConfig);
cachedDataSources.put(pipelineDataSourceConfig, dataSource);
targetDataSources.put(pipelineDataSourceConfig, dataSource);
}
/**
- * Get data source.
+ * Get cached data source.
*
* @param dataSourceConfig data source configuration
* @return data source
*/
public PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfiguration dataSourceConfig) {
- // TODO re-init if existing dataSource was closed
- if (cachedDataSources.containsKey(dataSourceConfig)) {
- return cachedDataSources.get(dataSourceConfig);
+ PipelineDataSourceWrapper result = cachedDataSources.get(dataSourceConfig);
+ if (null != result) {
+ return result;
}
synchronized (cachedDataSources) {
- if (cachedDataSources.containsKey(dataSourceConfig)) {
- return cachedDataSources.get(dataSourceConfig);
+ result = cachedDataSources.get(dataSourceConfig);
+ if (null != result) {
+ return result;
}
- PipelineDataSourceWrapper result = dataSourceFactory.newInstance(dataSourceConfig);
+ result = dataSourceFactory.newInstance(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, result);
return result;
}
@@ -100,5 +107,6 @@ public final class PipelineDataSourceManager implements AutoCloseable {
}
cachedDataSources.clear();
sourceDataSources.clear();
+ targetDataSources.clear();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index 6e326e2..eba47d3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
/**
@@ -30,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
*/
public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {
- public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position, final PipelineChannel channel) {
+ public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
+ final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 9bf2f2b..dba0a18 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -23,8 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import java.sql.Connection;
import java.sql.SQLException;
@@ -48,14 +47,12 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
- private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-
- protected final PipelineDataSourceWrapper getSourceDataSource(final PipelineConfiguration pipelineConfig) {
- return dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter()));
+ protected final PipelineDataSourceWrapper getSourceCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
+ return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter()));
}
- protected final PipelineDataSourceWrapper getTargetDataSource(final PipelineConfiguration pipelineConfig) {
- return dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter()));
+ protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
+ return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter()));
}
protected final void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/DataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
similarity index 86%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/DataSourcePreparer.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
index 1d9ccb1..4108e63 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/DataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
+package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
/**
* Data source preparer.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
similarity index 84%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index 3b1b202..c24bce3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
/**
* Prepare target tables parameter.
@@ -35,4 +36,7 @@ public final class PrepareTargetTablesParameter {
@NonNull
private final PipelineConfiguration pipelineConfiguration;
+
+ @NonNull
+ private final PipelineDataSourceManager dataSourceManager;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
index e7bdf57..a955df5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
@@ -17,9 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import java.util.Properties;
@@ -29,14 +26,8 @@ import java.util.Properties;
*/
public abstract class AbstractSingleTableDataCalculator implements SingleTableDataCalculator {
- private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-
private Properties algorithmProps;
- protected final PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfiguration dataSourceConfig) {
- return dataSourceFactory.newInstance(dataSourceConfig);
- }
-
@Override
public Properties getAlgorithmProps() {
return algorithmProps;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 166e689..f0a8c9b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -76,7 +76,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
IngestPosition<?> position = dumperConfig.getPosition();
progress.setPosition(position);
channel = createChannel(concurrency, pipelineChannelFactory, progress);
- dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel);
+ dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, dataSourceManager, channel);
importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index a298bb4..0bad421 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -24,17 +24,17 @@ import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
@@ -57,7 +57,7 @@ public final class RuleAlteredJobPreparer {
*/
public void prepare(final RuleAlteredJobContext jobContext) {
PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
- prepareTarget(jobContext.getJobConfig());
+ prepareTarget(jobContext.getJobConfig(), dataSourceManager);
try {
initDataSourceManager(dataSourceManager, jobContext.getTaskConfig());
checkDataSource(jobContext, dataSourceManager);
@@ -71,14 +71,14 @@ public final class RuleAlteredJobPreparer {
}
}
- private void prepareTarget(final JobConfiguration jobConfig) {
+ private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
DataSourcePreparer dataSourcePreparer = EnvironmentCheckerFactory.getDataSourcePreparer(jobConfig.getHandleConfig().getTargetDatabaseType());
if (null == dataSourcePreparer) {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
- PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig());
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig(), dataSourceManager);
dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java
index 4cc14ce..a65a00c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.core.job.check;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
/**
* Environment checker.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java
index fc1a672..68d8063 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java
@@ -21,8 +21,8 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckerImpl;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
index 33e51e9..eed54f6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
@@ -59,14 +59,17 @@ public final class DumperFactory {
*
* @param dumperConfig dumper configuration
* @param position position
+ * @param dataSourceManager data source manager
* @param channel channel
* @return incremental dumper
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<?> position, final PipelineChannel channel) {
+ public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<?> position,
+ final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getName();
ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType);
- Constructor<? extends IncrementalDumper> constructor = scalingEntry.getIncrementalDumperClass().getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineChannel.class);
- return constructor.newInstance(dumperConfig, position, channel);
+ Constructor<? extends IncrementalDumper> constructor = scalingEntry.getIncrementalDumperClass()
+ .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineDataSourceManager.class, PipelineChannel.class);
+ return constructor.newInstance(dumperConfig, position, dataSourceManager, channel);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 70d8d14..9c5e733 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -17,12 +17,12 @@
package org.apache.shardingsphere.scaling.core.job.environment;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -37,19 +37,19 @@ public final class ScalingEnvironmentManager {
private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
/**
- * Reset target table.
+ * Cleanup target tables.
*
- * @param jobContext job context
+ * @param jobConfig job configuration
* @throws SQLException SQL exception
*/
// TODO seems it should be removed, dangerous to use
- public void resetTargetTable(final RuleAlteredJobContext jobContext) throws SQLException {
- Collection<String> tables = jobContext.getTaskConfig().getDumperConfig().getTableNameMap().values();
- YamlPipelineDataSourceConfiguration target = jobContext.getJobConfig().getPipelineConfig().getTarget();
+ public void cleanupTargetTables(final JobConfiguration jobConfig) throws SQLException {
+ Collection<String> tables = jobConfig.getHandleConfig().splitLogicTableNames();
+ YamlPipelineDataSourceConfiguration target = jobConfig.getPipelineConfig().getTarget();
try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
- String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+ String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java
index 7df5d4c..0b74019 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.mysql;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.mysql.check.datasource.MySQLDataSourceChecker;
import org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource.MySQLDataSourcePreparer;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 5395875..b87c985 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
@@ -78,13 +78,14 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
VALUE_HANDLER_MAP = SingletonSPIRegistry.getSingletonInstancesMap(ValueHandler.class, ValueHandler::getTypeName);
}
- public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition, final PipelineChannel channel) {
- super(dumperConfig, binlogPosition, channel);
+ public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
+ final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ super(dumperConfig, binlogPosition, dataSourceManager, channel);
this.binlogPosition = (BinlogPosition) binlogPosition;
this.dumperConfig = dumperConfig;
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.channel = channel;
- columnMetaDataLoader = new MySQLColumnMetaDataLoader(new PipelineDataSourceFactory().newInstance(dumperConfig.getDataSourceConfig()));
+ columnMetaDataLoader = new MySQLColumnMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 20667db..5677ec4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -20,10 +20,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
@@ -45,10 +45,9 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
PipelineConfiguration pipelineConfig = parameter.getPipelineConfiguration();
- try (PipelineDataSourceWrapper sourceDataSource = getSourceDataSource(pipelineConfig);
- Connection sourceConnection = sourceDataSource.getConnection();
- PipelineDataSourceWrapper targetDataSource = getTargetDataSource(pipelineConfig);
- Connection targetConnection = targetDataSource.getConnection()) {
+ PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
+ try (Connection sourceConnection = getSourceCachedDataSource(pipelineConfig, dataSourceManager).getConnection();
+ Connection targetConnection = getTargetCachedDataSource(pipelineConfig, dataSourceManager).getConnection()) {
Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
for (String each : logicTableNames) {
String createTableSQL = getCreateTableSQL(sourceConnection, each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index c732e56..1b44d6a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -61,7 +61,7 @@ public final class MySQLIncrementalDumperTest {
DumperConfiguration dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
channel = new MultiplexMemoryPipelineChannel();
- incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel);
+ incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), new PipelineDataSourceManager(), channel);
}
private DumperConfiguration mockDumperConfiguration() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index ef7e7e7..dde7537 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -21,9 +21,9 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineCo
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 5c0c601..a91ba15 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
@@ -36,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.PGReplicationStream;
@@ -49,7 +49,7 @@ import java.sql.SQLException;
* OpenGauss WAL dumper.
*/
@Slf4j
-public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
+public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosition> {
private final WalPosition walPosition;
@@ -63,14 +63,16 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
private final PipelineChannel channel;
- public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position, final PipelineChannel channel) {
+ public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+ final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ super(dumperConfig, position, dataSourceManager, channel);
walPosition = (WalPosition) position;
if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
}
this.dumperConfig = dumperConfig;
this.channel = channel;
- walEventConverter = new WalEventConverter(dumperConfig);
+ walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index e72c557..f70538a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -23,11 +23,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.infra.datanode.DataNode;
import java.sql.Connection;
@@ -60,8 +60,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
}
Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
- try (PipelineDataSourceWrapper targetDataSource = getTargetDataSource(parameter.getPipelineConfiguration());
- Connection targetConnection = targetDataSource.getConnection()) {
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getPipelineConfiguration(), parameter.getDataSourceManager()).getConnection()) {
for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
for (String each : entry.getValue()) {
executeTargetTableSQL(targetConnection, each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java
index ad86d45..49f67a3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.postgresql;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.postgresql.check.datasource.PostgreSQLDataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 7a0acd7..4ae61bc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -58,15 +59,16 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
private final PipelineChannel channel;
- public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position, final PipelineChannel channel) {
- super(dumperConfig, position, channel);
+ public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+ final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ super(dumperConfig, position, dataSourceManager, channel);
walPosition = (WalPosition) position;
if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
}
this.dumperConfig = dumperConfig;
this.channel = channel;
- walEventConverter = new WalEventConverter(dumperConfig);
+ walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index 38c3155..c660ed0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineMetaDataManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
@@ -47,10 +47,10 @@ public final class WalEventConverter {
private final PipelineMetaDataManager metaDataManager;
- public WalEventConverter(final DumperConfiguration dumperConfig) {
+ public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) {
this.dumperConfig = dumperConfig;
databaseType = dumperConfig.getDataSourceConfig().getDatabaseType();
- metaDataManager = new PipelineMetaDataManager(new PipelineDataSourceFactory().newInstance(dumperConfig.getDataSourceConfig()));
+ metaDataManager = new PipelineMetaDataManager(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index fdbac8a..c54da70 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
@@ -70,7 +71,7 @@ public final class PostgreSQLWalDumperTest {
public void setUp() {
position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
channel = new MultiplexMemoryPipelineChannel();
- walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, channel);
+ walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, new PipelineDataSourceManager(), channel);
}
private DumperConfiguration mockDumperConfiguration() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 53e225c..1a18864 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -51,7 +51,7 @@ public final class WalEventConverterTest {
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
- walEventConverter = new WalEventConverter(dumperConfig);
+ walEventConverter = new WalEventConverter(dumperConfig, new PipelineDataSourceManager());
initTableData(dumperConfig);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
index efe69a3..feebbb3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
@@ -17,11 +17,13 @@
package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+import com.google.common.base.Splitter;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
+import java.util.Collection;
import java.util.List;
/**
@@ -72,4 +74,13 @@ public final class HandleConfiguration {
public int getJobShardingCount() {
return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
}
+
+ /**
+ * Split {@linkplain #logicTables} to logic table names.
+ *
+ * @return logic table names
+ */
+ public Collection<String> splitLogicTableNames() {
+ return Splitter.on(',').splitToList(logicTables);
+ }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java
index eb6c277..b086621 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
public final class FixtureDataSourcePreparer implements DataSourcePreparer {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java
index 4afe4a1..b540ae4 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
public final class FixtureEnvironmentChecker implements EnvironmentChecker {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 096d6b8..32bec41 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,12 +21,14 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
- public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position, final PipelineChannel channel) {
- super(dumperConfig, position, channel);
+ public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
+ final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ super(dumperConfig, position, dataSourceManager, channel);
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
index a80e96a..c2eb2a1 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
import org.junit.Test;