You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/30 02:09:55 UTC
[shardingsphere] branch master updated: Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (#15190)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e915749 Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (#15190)
e915749 is described below
commit e91574954dca0ee449d62511b29f591e3d54abc8
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun Jan 30 10:08:02 2022 +0800
Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (#15190)
* Simplify parameters and fields
* Initialize sourceDataSource and sourceMetaDataLoader in job context
* Reuse sourceDataSource and sourceMetaDataLoader in job process
* Unit test
---
.../ingest/dumper/AbstractIncrementalDumper.java | 4 +--
.../ingest/dumper/AbstractInventoryDumper.java | 39 +++++++++++++---------
.../data/pipeline/core/task/IncrementalTask.java | 10 +++---
.../data/pipeline/core/task/InventoryTask.java | 10 +++---
.../rulealtered/RuleAlteredJobContext.java | 39 ++++++++++++++++++++++
.../rulealtered/RuleAlteredJobPreparer.java | 15 +++++----
.../rulealtered/prepare/InventoryTaskSplitter.java | 18 +++++-----
.../scaling/core/job/dumper/DumperFactory.java | 21 +++++++-----
.../mysql/ingest/MySQLIncrementalDumper.java | 7 ++--
.../mysql/ingest/MySQLInventoryDumper.java | 8 +++--
.../mysql/ingest/MySQLIncrementalDumperTest.java | 12 ++++++-
.../pipeline/mysql/ingest/MySQLJdbcDumperTest.java | 21 ++++++------
.../opengauss/ingest/OpenGaussWalDumper.java | 8 ++---
.../ingest/PostgreSQLInventoryDumper.java | 8 +++--
.../postgresql/ingest/PostgreSQLWalDumper.java | 8 ++---
.../postgresql/ingest/wal/WalEventConverter.java | 5 ++-
.../ingest/PostgreSQLJdbcDumperTest.java | 17 ++++++----
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 25 ++++++++++----
.../ingest/wal/WalEventConverterTest.java | 11 +++++-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 10 ++++--
.../core/fixture/FixtureIncrementalDumper.java | 6 ++--
.../core/fixture/FixtureInventoryDumper.java | 8 +++--
.../pipeline/core/task/IncrementalTaskTest.java | 6 +++-
.../data/pipeline/core/task/InventoryTaskTest.java | 17 ++++++++--
.../prepare/InventoryTaskSplitterTest.java | 12 +++----
25 files changed, 228 insertions(+), 117 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index eba47d3..f36e16e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
/**
@@ -32,6 +32,6 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {
public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
- final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 063941a..b537ee1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -19,9 +19,11 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
import lombok.AccessLevel;
import lombok.Getter;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
@@ -34,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -42,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTable
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -62,29 +64,28 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- private final PipelineDataSourceManager dataSourceManager;
-
- private final PipelineTableMetaData tableMetaData;
+ private final LazyInitializer<PipelineTableMetaData> tableMetaDataLazyInitializer;
private final PipelineChannel channel;
- protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ private final DataSource dataSource;
+
+ protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+ final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
if (!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass())) {
throw new UnsupportedOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration");
}
this.inventoryDumperConfig = inventoryDumperConfig;
this.batchSize = inventoryDumperConfig.getBatchSize();
this.rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm();
- this.dataSourceManager = dataSourceManager;
+ tableMetaDataLazyInitializer = new LazyInitializer<PipelineTableMetaData>() {
+ @Override
+ protected PipelineTableMetaData initialize() {
+ return metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName());
+ }
+ };
this.channel = channel;
- tableMetaData = createTableMetaData();
- }
-
- private PipelineTableMetaData createTableMetaData() {
- PipelineDataSourceConfiguration dataSourceConfig = inventoryDumperConfig.getDataSourceConfig();
- // TODO share PipelineTableMetaDataLoader
- PipelineTableMetaDataLoader metaDataManager = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dataSourceConfig));
- return metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName());
+ this.dataSource = dataSource;
}
@Override
@@ -96,7 +97,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
String sql = getDumpSQL();
IngestPosition<?> position = inventoryDumperConfig.getPosition();
log.info("inventory dump, sql={}, position={}", sql, position);
- try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection()) {
+ try (Connection conn = dataSource.getConnection()) {
int round = 1;
Number startUniqueKeyValue = getPositionBeginValue(position) - 1;
Optional<Number> maxUniqueKeyValue;
@@ -123,10 +124,16 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
}
+ @SneakyThrows(ConcurrentException.class)
+ private PipelineTableMetaData getTableMetaData() {
+ return tableMetaDataLazyInitializer.get();
+ }
+
private Optional<Number> dump0(final Connection conn, final String sql, final Number startUniqueKeyValue, final int round) throws SQLException {
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
+ PipelineTableMetaData tableMetaData = getTableMetaData();
try (PreparedStatement preparedStatement = createPreparedStatement(conn, sql)) {
preparedStatement.setObject(1, startUniqueKeyValue);
preparedStatement.setObject(2, getPositionEndValue(inventoryDumperConfig.getPosition()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index f0a8c9b..a40065e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
@@ -47,7 +48,7 @@ import java.util.concurrent.Future;
* Incremental task.
*/
@Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine", "dataSourceManager", "dumper", "progress"})
+@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper", "importers", "progress"})
public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {
@Getter
@@ -55,8 +56,6 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
private final ExecuteEngine incrementalDumperExecuteEngine;
- private final PipelineDataSourceManager dataSourceManager;
-
private final PipelineChannel channel;
private final Dumper dumper;
@@ -68,15 +67,14 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager,
- final ExecuteEngine incrementalDumperExecuteEngine) {
+ final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine) {
this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
- this.dataSourceManager = dataSourceManager;
taskId = dumperConfig.getDataSourceName();
progress = new IncrementalTaskProgress();
IngestPosition<?> position = dumperConfig.getPosition();
progress.setPosition(position);
channel = createChannel(concurrency, pipelineChannelFactory, progress);
- dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, dataSourceManager, channel);
+ dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ffd0f97..b4d814b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -32,12 +32,14 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
+import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -46,7 +48,7 @@ import java.util.concurrent.Future;
* Inventory task.
*/
@Slf4j
-@ToString(exclude = {"importerExecuteEngine", "dataSourceManager", "dumper"})
+@ToString(exclude = {"importerExecuteEngine", "channel", "dumper", "importer"})
public final class InventoryTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {
@Getter
@@ -54,8 +56,6 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
private final ExecuteEngine importerExecuteEngine;
- private final PipelineDataSourceManager dataSourceManager;
-
private final PipelineChannel channel;
private final Dumper dumper;
@@ -66,12 +66,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager,
+ final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
final ExecuteEngine importerExecuteEngine) {
this.importerExecuteEngine = importerExecuteEngine;
- this.dataSourceManager = dataSourceManager;
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelFactory);
- dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, dataSourceManager, channel);
+ dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel);
position = inventoryDumperConfig.getPosition();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 4612f5c..32562af 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -19,12 +19,17 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.Getter;
import lombok.Setter;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -60,6 +65,20 @@ public final class RuleAlteredJobContext {
private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+ private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
+ @Override
+ protected PipelineDataSourceWrapper initialize() {
+ return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
+ }
+ };
+
+ private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
+ @Override
+ protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
+ return new PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
+ }
+ };
+
private RuleAlteredJobPreparer jobPreparer;
public RuleAlteredJobContext(final JobConfiguration jobConfig) {
@@ -72,6 +91,26 @@ public final class RuleAlteredJobContext {
}
/**
+ * Get source data source.
+ *
+ * @return source data source
+ */
+ @SneakyThrows(ConcurrentException.class)
+ public PipelineDataSourceWrapper getSourceDataSource() {
+ return sourceDataSourceLazyInitializer.get();
+ }
+
+ /**
+ * Get source metadata loader.
+ *
+ * @return source metadata loader
+ */
+ @SneakyThrows(ConcurrentException.class)
+ public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
+ return sourceMetaDataLoaderLazyInitializer.get();
+ }
+
+ /**
* Release resources.
*/
public void close() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index fa95de7..e756f4b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -64,8 +65,8 @@ public final class RuleAlteredJobPreparer {
prepareTarget(jobContext.getJobConfig(), dataSourceManager);
initAndCheckDataSource(jobContext);
try {
- initIncrementalTasks(jobContext, dataSourceManager);
- initInventoryTasks(jobContext, dataSourceManager);
+ initIncrementalTasks(jobContext);
+ initInventoryTasks(jobContext);
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
jobContext.getJobId(), jobContext.getShardingItem(), jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
} catch (final SQLException ex) {
@@ -112,18 +113,20 @@ public final class RuleAlteredJobPreparer {
dataSourceChecker.checkTargetTable(targetDataSources, jobContext.getTaskConfig().getImporterConfig().getShardingColumnsMap().keySet());
}
- private void initInventoryTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
- List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+ private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
+ List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext);
jobContext.getInventoryTasks().addAll(allInventoryTasks);
}
- private void initIncrementalTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
+ private void initIncrementalTasks(final RuleAlteredJobContext jobContext) throws SQLException {
PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
ExecuteEngine incrementalDumperExecuteEngine = jobContext.getRuleAlteredContext().getIncrementalDumperExecuteEngine();
TaskConfiguration taskConfig = jobContext.getTaskConfig();
+ PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
+ PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getHandleConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
- pipelineChannelFactory, dataSourceManager, incrementalDumperExecuteEngine);
+ pipelineChannelFactory, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
jobContext.getIncrementalTasks().add(incrementalTask);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 757b754..2a9ff69 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
@@ -65,25 +64,26 @@ public final class InventoryTaskSplitter {
* Split inventory data to multi-tasks.
*
* @param jobContext job context
- * @param dataSourceManager data source manager
* @return split inventory data task
*/
- public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
+ public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext jobContext) {
List<InventoryTask> result = new LinkedList<>();
TaskConfiguration taskConfig = jobContext.getTaskConfig();
PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
+ PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
+ DataSource dataSource = jobContext.getSourceDataSource();
+ PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
- for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig(), dataSourceManager)) {
- result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, dataSourceManager, importerExecuteEngine));
+ for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) {
+ result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine));
}
return result;
}
- private Collection<InventoryDumperConfiguration> splitDumperConfig(
- final RuleAlteredJobContext jobContext, final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) {
+ private Collection<InventoryDumperConfiguration> splitDumperConfig(final RuleAlteredJobContext jobContext, final DumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
- PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+ DataSource dataSource = jobContext.getSourceDataSource();
+ PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
result.addAll(splitByPrimaryKey(jobContext, dataSource, metaDataLoader, each));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
index eed54f6..3ecacf1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
@@ -24,12 +24,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
+import javax.sql.DataSource;
import java.lang.reflect.Constructor;
/**
@@ -42,16 +43,18 @@ public final class DumperFactory {
* Create inventory dumper.
*
* @param inventoryDumperConfig inventory dumper configuration
- * @param dataSourceManager data source factory
* @param channel channel
+ * @param sourceDataSource data source
+ * @param sourceMetaDataLoader metadata loader
* @return inventory dumper
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+ final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getName());
Constructor<? extends InventoryDumper> constructor = scalingEntry.getInventoryDumperClass()
- .getConstructor(InventoryDumperConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class);
- return constructor.newInstance(inventoryDumperConfig, dataSourceManager, channel);
+ .getConstructor(InventoryDumperConfiguration.class, PipelineChannel.class, DataSource.class, PipelineTableMetaDataLoader.class);
+ return constructor.newInstance(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
}
/**
@@ -59,17 +62,17 @@ public final class DumperFactory {
*
* @param dumperConfig dumper configuration
* @param position position
- * @param dataSourceManager data source manager
* @param channel channel
+ * @param sourceMetaDataLoader metadata loader
* @return incremental dumper
*/
@SneakyThrows(ReflectiveOperationException.class)
public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<?> position,
- final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ final PipelineChannel channel, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getName();
ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType);
Constructor<? extends IncrementalDumper> constructor = scalingEntry.getIncrementalDumperClass()
- .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineDataSourceManager.class, PipelineChannel.class);
- return constructor.newInstance(dumperConfig, position, dataSourceManager, channel);
+ .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineChannel.class, PipelineTableMetaDataLoader.class);
+ return constructor.newInstance(dumperConfig, position, channel, sourceMetaDataLoader);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 6009940..d90e75d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -79,13 +78,13 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
}
public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
- final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(dumperConfig, binlogPosition, dataSourceManager, channel);
+ final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(dumperConfig, binlogPosition, channel, metaDataLoader);
this.binlogPosition = (BinlogPosition) binlogPosition;
this.dumperConfig = dumperConfig;
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.channel = channel;
- metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ this.metaDataLoader = metaDataLoader;
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 809d5b9..5a22aec 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -19,9 +19,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -36,8 +37,9 @@ public final class MySQLInventoryDumper extends AbstractInventoryDumper {
private static final String YEAR_DATA_TYPE = "YEAR";
- public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(inventoryDumperConfig, dataSourceManager, channel);
+ public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+ final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
Properties queryProps = new Properties();
queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 1b44d6a..173ee39 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -34,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteR
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -56,12 +58,20 @@ public final class MySQLIncrementalDumperTest {
private MultiplexMemoryPipelineChannel channel;
+ private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
channel = new MultiplexMemoryPipelineChannel();
- incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), new PipelineDataSourceManager(), channel);
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
+ }
+
+ @After
+ public void tearDown() {
+ dataSourceManager.close();
}
private DumperConfiguration mockDumperConfiguration() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
index d74c70d..5dbfcf3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
@@ -20,13 +20,14 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
@@ -45,22 +46,20 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLJdbcDumperTest {
- private PipelineDataSourceManager dataSourceManager;
-
private MySQLInventoryDumper mysqlJdbcDumper;
- @Mock
- private Connection connection;
-
@Before
public void setUp() {
- dataSourceManager = new PipelineDataSourceManager();
- mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), dataSourceManager, new SimpleMemoryPipelineChannel(100));
+ PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+ InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
+ PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100),
+ dataSource, new PipelineTableMetaDataLoader(dataSource));
+ initTableData(dataSource);
}
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
- initTableData(dumperConfig);
InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
result.setTableName("t_order");
return result;
@@ -73,8 +72,7 @@ public final class MySQLJdbcDumperTest {
}
@SneakyThrows(SQLException.class)
- private void initTableData(final DumperConfiguration dumperConfig) {
- DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ private void initTableData(final DataSource dataSource) {
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
@@ -98,6 +96,7 @@ public final class MySQLJdbcDumperTest {
@Test
public void assertCreatePreparedStatement() throws SQLException {
+ Connection connection = mock(Connection.class);
when(connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)).thenReturn(mock(PreparedStatement.class));
PreparedStatement preparedStatement = mysqlJdbcDumper.createPreparedStatement(connection, "");
verify(preparedStatement).setFetchSize(Integer.MIN_VALUE);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index a91ba15..693d973 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,10 +23,10 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
@@ -64,15 +64,15 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
private final PipelineChannel channel;
public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
- final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(dumperConfig, position, dataSourceManager, channel);
+ final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(dumperConfig, position, channel, metaDataLoader);
walPosition = (WalPosition) position;
if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
}
this.dumperConfig = dumperConfig;
this.channel = channel;
- walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
+ walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 4b9a005..2f6a38b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -19,10 +19,11 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.postgresql.util.PGobject;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -38,8 +39,9 @@ public final class PostgreSQLInventoryDumper extends AbstractInventoryDumper {
private static final String PG_BIT_TYPE = "bit";
- public PostgreSQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(inventoryDumperConfig, dataSourceManager, channel);
+ public PostgreSQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+ final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 4ae61bc..effdcf1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
@@ -60,15 +60,15 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
private final PipelineChannel channel;
public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
- final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(dumperConfig, position, dataSourceManager, channel);
+ final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(dumperConfig, position, channel, metaDataLoader);
walPosition = (WalPosition) position;
if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
}
this.dumperConfig = dumperConfig;
this.channel = channel;
- walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
+ walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index c65987f..57038e5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -44,9 +43,9 @@ public final class WalEventConverter {
private final PipelineTableMetaDataLoader metaDataLoader;
- public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) {
+ public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperConfig = dumperConfig;
- metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ this.metaDataLoader = metaDataLoader;
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index 7733e00..7c0b852 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.junit.Before;
import org.junit.Test;
@@ -37,27 +39,29 @@ import static org.junit.Assert.assertThat;
public final class PostgreSQLJdbcDumperTest {
- private PipelineDataSourceManager dataSourceManager;
+ private PipelineDataSourceWrapper dataSource;
private PostgreSQLInventoryDumper jdbcDumper;
@Before
public void setUp() {
- dataSourceManager = new PipelineDataSourceManager();
- jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), dataSourceManager, new SimpleMemoryPipelineChannel(100));
+ PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+ InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
+ dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100),
+ dataSource, new PipelineTableMetaDataLoader(dataSource));
+ initTableData(dataSource);
}
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
- initTableData(dumperConfig);
InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
result.setTableName("t_order");
return result;
}
@SneakyThrows(SQLException.class)
- private void initTableData(final DumperConfiguration dumperConfig) {
- DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ private void initTableData(final DataSource dataSource) {
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
@@ -68,7 +72,6 @@ public final class PostgreSQLJdbcDumperTest {
@Test
public void assertCreatePreparedStatement() throws SQLException {
- DataSource dataSource = dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfig());
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = jdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
assertThat(preparedStatement.getFetchSize(), is(1));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index c54da70..dbaf5a6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -18,14 +18,17 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -61,17 +64,26 @@ public final class PostgreSQLWalDumperTest {
private WalPosition position;
- private PostgreSQLWalDumper walDumper;
+ private DumperConfiguration dumperConfig;
- private StandardPipelineDataSourceConfiguration pipelineDataSourceConfig;
+ private PostgreSQLWalDumper walDumper;
private MultiplexMemoryPipelineChannel channel;
+ private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+
@Before
public void setUp() {
position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
channel = new MultiplexMemoryPipelineChannel();
- walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, new PipelineDataSourceManager(), channel);
+ dumperConfig = mockDumperConfiguration();
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
+ }
+
+ @After
+ public void tearDown() {
+ dataSourceManager.close();
}
private DumperConfiguration mockDumperConfiguration() {
@@ -86,9 +98,9 @@ public final class PostgreSQLWalDumperTest {
} catch (final SQLException e) {
throw new RuntimeException("Init table failed", e);
}
- pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
+ PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
DumperConfiguration result = new DumperConfiguration();
- result.setDataSourceConfig(pipelineDataSourceConfig);
+ result.setDataSourceConfig(dataSourceConfig);
Map<String, String> tableNameMap = new HashMap<>();
tableNameMap.put("t_order_0", "t_order");
result.setTableNameMap(tableNameMap);
@@ -97,9 +109,10 @@ public final class PostgreSQLWalDumperTest {
@Test
public void assertStart() throws SQLException, NoSuchFieldException, IllegalAccessException {
+ StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig();
try {
ReflectionUtil.setFieldValue(walDumper, "logicalReplication", logicalReplication);
- when(logicalReplication.createConnection(pipelineDataSourceConfig)).thenReturn(pgConnection);
+ when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), position.getLogSequenceNumber()))
.thenReturn(pgReplicationStream);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 1a18864..df3f1ab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -25,11 +25,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -48,13 +50,20 @@ public final class WalEventConverterTest {
private WalEventConverter walEventConverter;
+ private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
- walEventConverter = new WalEventConverter(dumperConfig, new PipelineDataSourceManager());
+ walEventConverter = new WalEventConverter(dumperConfig, new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
initTableData(dumperConfig);
}
+ @After
+ public void tearDown() {
+ dataSourceManager.close();
+ }
+
private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 7349617..2ed943b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.impl;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -28,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstan
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -52,6 +54,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
public final class GovernanceRepositoryAPIImplTest {
@@ -130,14 +133,17 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setTableName("t_order");
dumperConfig.setPrimaryKey("order_id");
dumperConfig.setShardingItem(0);
+ PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(),
- new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine());
+ new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine());
}
private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
dumperConfig.setPosition(new PlaceholderPosition());
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(),
- new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine());
+ new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 32bec41..b97eebb 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,14 +21,14 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
- final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(dumperConfig, position, dataSourceManager, channel);
+ final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(dumperConfig, position, channel, metaDataLoader);
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
index 7bc7a8c..3da4758 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
@@ -19,17 +19,19 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public final class FixtureInventoryDumper extends AbstractInventoryDumper {
- public FixtureInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(dumperConfig, dataSourceManager, channel);
+ public FixtureInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel,
+ final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(dumperConfig, channel, dataSource, metaDataLoader);
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index bf1c14f..52765c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -18,8 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
@@ -31,6 +33,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
public final class IncrementalTaskTest {
@@ -45,9 +48,10 @@ public final class IncrementalTaskTest {
public void setUp() {
TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelFactory(),
- new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine());
+ new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 57320bc..b80d201 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -26,9 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -42,12 +44,19 @@ public final class InventoryTaskTest {
private static TaskConfiguration taskConfig;
+ private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new PipelineDataSourceManager();
+
@BeforeClass
public static void beforeClass() {
PipelineContextUtil.mockModeConfig();
taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfig();
}
+ @AfterClass
+ public static void afterClass() {
+ DATA_SOURCE_MANAGER.close();
+ }
+
@Test(expected = IngestException.class)
public void assertStartWithGetEstimatedRowsFailure() {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
@@ -57,9 +66,11 @@ public final class InventoryTaskTest {
position = new PrimaryKeyPosition(0, 1000);
}
inventoryDumperConfig.setPosition(position);
+ PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelFactory(),
- new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine())) {
+ DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
inventoryTask.start();
}
}
@@ -74,9 +85,11 @@ public final class InventoryTaskTest {
position = new PrimaryKeyPosition(0, 1000);
}
inventoryDumperConfig.setPosition(position);
+ PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
+ PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelFactory(),
- new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine())) {
+ new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
inventoryTask.start();
assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index b7ae201..8a03427 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -51,12 +51,12 @@ public final class InventoryTaskSplitterTest {
@Before
public void setUp() {
initJobContext();
- dataSourceManager = new PipelineDataSourceManager();
inventoryTaskSplitter = new InventoryTaskSplitter();
}
private void initJobContext() {
jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig());
+ dataSourceManager = jobContext.getDataSourceManager();
taskConfig = jobContext.getTaskConfig();
}
@@ -64,7 +64,7 @@ public final class InventoryTaskSplitterTest {
public void assertSplitInventoryDataWithEmptyTable() throws SQLException {
taskConfig.getHandleConfig().setShardingSize(10);
initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);
assertThat(actual.size(), is(1));
assertThat(((PrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L));
@@ -75,7 +75,7 @@ public final class InventoryTaskSplitterTest {
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
taskConfig.getHandleConfig().setShardingSize(10);
initIntPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);
assertThat(actual.size(), is(10));
assertThat(((PrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L));
@@ -85,7 +85,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
initCharPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -93,7 +93,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -101,7 +101,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}