You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/02 10:20:38 UTC
[shardingsphere] branch master updated: Refactor CDC layer (#25994)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2d7bc66a905 Refactor CDC layer (#25994)
2d7bc66a905 is described below
commit 2d7bc66a905693a2eaab1f0d6ae61b2fc45c8510
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Jun 2 18:20:27 2023 +0800
Refactor CDC layer (#25994)
* Move AutoCloseable to PipelineTask
* Extract TaskExecuteCallback from InventoryTask and IncrementalTask
* Replace AutoCloseable to Closeable for PipelineTask
* Replace InventoryTask and IncrementalTask in InventoryIncrementalJobItemContext to PipelineTask, update related classes
* Extract AckCallbacks
* Extract PipelineTaskUtils
* Add inventory and incremental task for CDC; Refactor CDCJobPreparer
* Simplify InventoryTask params for better reuse
* Simplify IncrementalTask params for better reuse
* Remove getConnector() in migration
* Rename ImporterConnector to PipelineSink
* Rename connector to sink
* Remove jobShardingCount in PipelineSocketSink
* Add SingleChannelConsumerImporter
* Merge DataSourceImporter and PipelineDataSourceSink
* Replace DataSourceImporter ref; Remove DataSourceImporterCreator
* Add CDCImporter, CDCAckId;
Remove SocketSinkImporter, SocketSinkImporterCreator, CDCAckHolder, CDCDataRecordUtils, DataRecordComparatorGenerator;
Refactor PipelineChannel, PipelineSink and impls;
* Refactor CDCJob execution
* Refactor CDC job composing
* Remove unused classes; Remove PipelineSink.getType
* Refactor PipelineSink.getConnector() to identifierMatched()
* Refactor TreeMap to PriorityQueue in CDCImporter
* Update core unit test
* Update it unit test
* Remove channel from InventoryTask and IncrementalTask
* Refactor CDC job composing
* Update
* Clean code
* Simplify InventoryIncrementalTasksRunner constructor param
* Refactor CDCTasksRunner construction and stop; Cache CDCImporter before running
* Refactor CDCImporter, add CDCChannelProgressPair
* Update to pass CDC E2E MySQL
* Update to pass CDC E2E openGauss
* Add missed license header
* Use Caffeine cache replace Guava cache
---
.../api/executor/AbstractLifecycleExecutor.java | 9 -
.../api/ingest/channel/PipelineChannel.java | 22 +-
kernel/data-pipeline/cdc/core/pom.xml | 16 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 15 +-
.../cdc/context/job/CDCJobItemContext.java | 22 +-
.../data/pipeline/cdc/core/ack/CDCAckHolder.java | 87 -------
.../data/pipeline/cdc/core/ack/CDCAckId.java | 67 ++++++
.../data/pipeline/cdc/core/ack/CDCAckPosition.java | 14 --
.../connector/SocketSinkImporterConnector.java | 250 ---------------------
.../cdc/core/importer/CDCChannelProgressPair.java} | 27 +--
.../pipeline/cdc/core/importer/CDCImporter.java | 214 ++++++++++++++++++
.../cdc/core/importer/CDCImporterManager.java} | 49 ++--
.../pipeline/cdc/core/importer/CSNRecords.java} | 30 +--
.../cdc/core/importer/CSNRecordsComparator.java | 23 +-
.../cdc/core/importer/SocketSinkImporter.java | 120 ----------
.../core/importer/SocketSinkImporterCreator.java | 44 ----
.../cdc/core/importer/sink/CDCSocketSink.java | 113 ++++++++++
.../data/pipeline/cdc/core/job/CDCJob.java | 164 ++++++++++++--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 116 +++++++---
.../pipeline/cdc/core/task/CDCIncrementalTask.java | 79 +++++++
.../pipeline/cdc/core/task/CDCInventoryTask.java | 86 +++++++
.../pipeline/cdc/core/task/CDCTasksRunner.java | 39 +++-
.../generator/DataRecordComparatorGenerator.java | 47 ----
.../pipeline/cdc/handler/CDCBackendHandler.java | 45 ++--
.../data/pipeline/cdc/util/CDCDataRecordUtils.java | 115 ----------
...here.data.pipeline.spi.importer.ImporterCreator | 18 --
.../pipeline/cdc/core/ack/CDCAckHolderTest.java | 64 ------
.../data/pipeline/cdc/core/ack/CDCAckIdTest.java | 24 +-
.../core/importer/CSNRecordsComparatorTest.java | 46 ++++
.../importer/SocketSinkImporterCreatorTest.java | 48 ----
.../DataRecordComparatorGeneratorTest.java | 55 -----
.../pipeline/cdc/util/CDCDataRecordUtilsTest.java | 73 ------
.../InventoryIncrementalJobItemContext.java | 8 +-
.../core/importer/DataSourceImporterCreator.java | 44 ----
.../importer/SingleChannelConsumerImporter.java | 74 ++++++
.../PipelineDataSourceSink.java} | 55 ++---
.../pipeline/core/ingest/channel/AckCallbacks.java | 61 +++++
.../memory/MultiplexMemoryPipelineChannel.java | 12 +-
.../memory/SimpleMemoryPipelineChannel.java | 26 ++-
.../core/ingest/dumper/InventoryDumper.java | 4 +-
.../pipeline/core/job/AbstractPipelineJob.java | 2 +-
.../data/pipeline/core/job/PipelineJobCenter.java | 10 +
.../core/prepare/InventoryTaskSplitter.java | 29 ++-
.../data/pipeline/core/task/IncrementalTask.java | 93 +-------
.../core/task/InventoryIncrementalTasksRunner.java | 15 +-
.../data/pipeline/core/task/InventoryTask.java | 76 +------
.../data/pipeline/core/task/PipelineTask.java | 3 +-
.../data/pipeline/core/task/PipelineTaskUtils.java | 89 ++++++++
.../TaskExecuteCallback.java} | 24 +-
.../data/pipeline/core/util/PipelineJdbcUtils.java | 20 ++
.../pipeline/spi/importer/ImporterCreator.java | 47 ----
.../importer/sink/PipelineSink.java} | 45 ++--
...here.data.pipeline.spi.importer.ImporterCreator | 18 --
.../pipeline/scenario/migration/MigrationJob.java | 3 +-
.../migration/context/MigrationJobItemContext.java | 9 +-
.../migration/prepare/MigrationJobPreparer.java | 60 +++--
.../pipeline/cases/PipelineContainerComposer.java | 9 +-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +-
.../cases/cdc/DataSourceRecordConsumer.java | 1 +
.../api/impl/GovernanceRepositoryAPIImplTest.java | 20 +-
.../pipeline/core/fixture/FixtureImporter.java | 4 +-
.../core/fixture/FixtureImporterCreator.java | 44 ----
.../FixtureInventoryIncrementalJobItemContext.java | 4 +-
...erTest.java => PipelineDataSourceSinkTest.java} | 32 +--
.../core/prepare/InventoryTaskSplitterTest.java | 2 +-
.../pipeline/core/task/IncrementalTaskTest.java | 15 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 27 ++-
...here.data.pipeline.spi.importer.ImporterCreator | 18 --
68 files changed, 1501 insertions(+), 1645 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index 463da675642..ac008c426aa 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -23,7 +23,6 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
-import java.sql.Statement;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -50,7 +49,6 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
running = true;
startTimeMillis = System.currentTimeMillis();
runBlocking();
- stop();
}
/**
@@ -78,13 +76,6 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
protected abstract void doStop() throws SQLException;
- protected final void cancelStatement(final Statement statement) throws SQLException {
- if (null == statement || statement.isClosed()) {
- return;
- }
- statement.cancel();
- }
-
@Override
public final void run() {
start();
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
index 25ce5a44a59..c99a7986b1e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
@@ -19,13 +19,14 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.channel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import java.io.Closeable;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Pipeline channel.
*/
-public interface PipelineChannel {
+public interface PipelineChannel extends Closeable {
/**
* Push {@code DataRecord} into channel.
@@ -41,15 +42,30 @@ public interface PipelineChannel {
* @param batchSize record batch size
* @param timeout timeout
* @param timeUnit time unit
- * @return record
+ * @return records of transactions
*/
- List<Record> fetchRecords(int batchSize, int timeout, TimeUnit timeUnit);
+ List<Record> fetchRecords(int batchSize, long timeout, TimeUnit timeUnit);
+
+ /**
+ * Peek {@code Record} list from channel.
+ *
+ * @return records of a transaction
+ */
+ List<Record> peekRecords();
+
+ /**
+ * Poll {@code Record} list from channel.
+ *
+ * @return records of a transaction
+ */
+ List<Record> pollRecords();
/**
* Ack the last batch.
*
* @param records record list
*/
+ // TODO Refactor ack param
void ack(List<Record> records);
/**
diff --git a/kernel/data-pipeline/cdc/core/pom.xml b/kernel/data-pipeline/cdc/core/pom.xml
index e6992a4fe45..6a0f99297d0 100644
--- a/kernel/data-pipeline/cdc/core/pom.xml
+++ b/kernel/data-pipeline/cdc/core/pom.xml
@@ -40,16 +40,16 @@
</dependency>
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-test-util</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-params</artifactId>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-test-fixture-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 7ed1eb70836..0ffa569b115 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -71,7 +71,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
import org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -204,14 +204,21 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
return result;
}
+ @Override
+ protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) {
+ JobConfigurationPOJO result = super.convertJobConfiguration(jobConfig);
+ result.setShardingTotalCount(1);
+ return result;
+ }
+
/**
* Start job.
*
* @param jobId job id
- * @param importerConnector importer connector
+ * @param sink sink
*/
- public void startJob(final String jobId, final ImporterConnector importerConnector) {
- CDCJob job = new CDCJob(jobId, importerConnector);
+ public void startJob(final String jobId, final PipelineSink sink) {
+ CDCJob job = new CDCJob(jobId, sink);
PipelineJobCenter.addJob(jobId, job);
updateJobConfigurationDisabled(jobId, false);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index f42ef50242b..51284920cdb 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.context.job;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
@@ -35,7 +36,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import java.util.Collection;
import java.util.LinkedList;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* CDC job item context.
*/
+@RequiredArgsConstructor
@Getter
public final class CDCJobItemContext implements InventoryIncrementalJobItemContext {
@@ -65,7 +67,7 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
private final PipelineDataSourceManager dataSourceManager;
- private final ImporterConnector importerConnector;
+ private final PipelineSink sink;
private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
@@ -91,17 +93,6 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
}
};
- public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final InventoryIncrementalJobItemProgress initProgress, final CDCProcessContext jobProcessContext,
- final CDCTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager, final ImporterConnector importerConnector) {
- this.jobConfig = jobConfig;
- this.shardingItem = shardingItem;
- this.initProgress = initProgress;
- this.jobProcessContext = jobProcessContext;
- this.taskConfig = taskConfig;
- this.dataSourceManager = dataSourceManager;
- this.importerConnector = importerConnector;
- }
-
@Override
public String getJobId() {
return jobConfig.getJobId();
@@ -134,9 +125,8 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
return sourceMetaDataLoaderLazyInitializer.get();
}
- @Override
- public ImporterConnector getImporterConnector() {
- return importerConnector;
+ public PipelineSink getSink() {
+ return sink;
}
@Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
deleted file mode 100644
index 7d5e07fea05..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * CDC ack holder.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCAckHolder {
-
- private static final CDCAckHolder INSTANCE = new CDCAckHolder();
-
- private final Map<String, Map<SocketSinkImporter, CDCAckPosition>> ackIdPositionMap = new ConcurrentHashMap<>();
-
- /**
- * the ack of CDC.
- *
- * @param ackId ack id
- */
- public void ack(final String ackId) {
- Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = ackIdPositionMap.remove(ackId);
- if (null != importerDataRecordMap) {
- importerDataRecordMap.forEach(SocketSinkImporter::ackWithLastDataRecord);
- }
- }
-
- /**
- * Bind ack id.
- *
- * @param importerDataRecordMap import data record map
- * @return ack id
- */
- public String bindAckIdWithPosition(final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
- String result = generateAckId();
- // TODO it's might need to persist to registry center in cluster mode.
- ackIdPositionMap.put(result, importerDataRecordMap);
- return result;
- }
-
- private String generateAckId() {
- return "ACK-" + UUID.randomUUID();
- }
-
- /**
- * Clean up.
- *
- * @param socketSinkImporter CDC importer
- */
- public void cleanUp(final SocketSinkImporter socketSinkImporter) {
- if (ackIdPositionMap.isEmpty()) {
- return;
- }
- ackIdPositionMap.entrySet().removeIf(entry -> entry.getValue().containsKey(socketSinkImporter));
- }
-
- /**
- * Get instance.
- *
- * @return CDC ack holder
- */
- public static CDCAckHolder getInstance() {
- return INSTANCE;
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckId.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckId.java
new file mode 100644
index 00000000000..f9e414cb04c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckId.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+
+import com.google.common.base.Splitter;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.util.List;
+
+/**
+ * CDC ack id.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCAckId {
+
+ private final String importerId;
+
+ private final String random;
+
+ /**
+ * Build ack id.
+ *
+ * @param importerId importer id
+ * @return ack id
+ */
+ public static CDCAckId build(final String importerId) {
+ return new CDCAckId(importerId, RandomStringUtils.randomAlphanumeric(16));
+ }
+
+ /**
+ * Marshal ack id.
+ *
+ * @return ack id
+ */
+ public String marshal() {
+ return importerId + "_" + random;
+ }
+
+ /**
+ * Unmarshal ack id from text.
+ *
+ * @param text text
+ * @return ack id
+ */
+ public static CDCAckId unmarshal(final String text) {
+ List<String> parts = Splitter.on('_').trimResults().omitEmptyStrings().splitToList(text);
+ return new CDCAckId(parts.get(0), parts.get(1));
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
index a62ad348b9d..e1391fc59bf 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
@@ -42,20 +42,6 @@ public final class CDCAckPosition {
createTimeMills = System.currentTimeMillis();
}
- public CDCAckPosition(final Record lastRecord, final long createTimeMills) {
- this.lastRecord = lastRecord;
- this.createTimeMills = createTimeMills;
- }
-
- /**
- * Add data record count.
- *
- * @param count count.
- */
- public void addDataRecordCount(final int count) {
- dataRecordCount.addAndGet(count);
- }
-
/**
* Get data record count.
*
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
deleted file mode 100644
index bc3f6314d67..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.connector;
-
-import io.netty.channel.Channel;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
-import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtils;
-import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Socket sink importer connector.
- */
-@Slf4j
-public final class SocketSinkImporterConnector implements ImporterConnector, AutoCloseable {
-
- private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
-
- private final Lock lock = new ReentrantLock();
-
- private final Condition condition = lock.newCondition();
-
- @Setter
- private volatile boolean incrementalTaskRunning = true;
-
- private final ShardingSphereDatabase database;
-
- private final Channel channel;
-
- private final int jobShardingCount;
-
- private final Comparator<DataRecord> dataRecordComparator;
-
- private final Map<String, String> tableNameSchemaMap = new HashMap<>();
-
- private final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap = new ConcurrentHashMap<>();
-
- private final AtomicInteger runningIncrementalTaskCount = new AtomicInteger(0);
-
- private Thread incrementalImporterTask;
-
- public SocketSinkImporterConnector(final Channel channel, final ShardingSphereDatabase database, final int jobShardingCount, final Collection<String> schemaTableNames,
- final Comparator<DataRecord> dataRecordComparator) {
- this.channel = channel;
- this.database = database;
- this.jobShardingCount = jobShardingCount;
- schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
- String[] split = each.split("\\.");
- tableNameSchemaMap.put(split[1], split[0]);
- });
- this.dataRecordComparator = dataRecordComparator;
- }
-
- @Override
- public Object getConnector() {
- return channel;
- }
-
- /**
- * Write data record into channel.
- *
- * @param recordList data records
- * @param socketSinkImporter cdc importer
- * @param importerType importer type
- */
- public void write(final List<Record> recordList, final SocketSinkImporter socketSinkImporter, final ImporterType importerType) {
- if (recordList.isEmpty()) {
- return;
- }
- if (ImporterType.INVENTORY == importerType || null == dataRecordComparator) {
- int dataRecordCount = (int) recordList.stream().filter(DataRecord.class::isInstance).count();
- Record lastRecord = recordList.get(recordList.size() - 1);
- if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
- socketSinkImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
- return;
- }
- Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
- importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(lastRecord, dataRecordCount));
- writeImmediately(recordList, importerDataRecordMap);
- } else if (ImporterType.INCREMENTAL == importerType) {
- writeIntoQueue(recordList, socketSinkImporter);
- }
- }
-
- private void writeImmediately(final List<? extends Record> recordList, final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
- doAwait();
- if (!channel.isActive()) {
- return;
- }
- List<DataRecordResult.Record> records = new LinkedList<>();
- for (Record each : recordList) {
- if (!(each instanceof DataRecord)) {
- continue;
- }
- DataRecord dataRecord = (DataRecord) each;
- records.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
- }
- String ackId = CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
- DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(records).setAckId(ackId).build();
- channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
- }
-
- @SuppressWarnings("ResultOfMethodCallIgnored")
- private void doAwait() {
- while (!channel.isWritable() && channel.isActive()) {
- lock.lock();
- try {
- condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException ignored) {
- Thread.currentThread().interrupt();
- } finally {
- lock.unlock();
- }
- }
- }
-
- @SneakyThrows(InterruptedException.class)
- private void writeIntoQueue(final List<Record> dataRecords, final SocketSinkImporter socketSinkImporter) {
- BlockingQueue<List<DataRecord>> blockingQueue = incrementalRecordMap.get(socketSinkImporter);
- if (null == blockingQueue) {
- log.warn("not find the queue to write");
- return;
- }
- Map<Long, List<DataRecord>> recordsMap = new LinkedHashMap<>();
- for (Record each : dataRecords) {
- if (!(each instanceof DataRecord)) {
- continue;
- }
- DataRecord dataRecord = (DataRecord) each;
- // TODO need improve if support global transaction
- recordsMap.computeIfAbsent(dataRecord.getCsn(), ignored -> new LinkedList<>()).add(dataRecord);
- }
- for (List<DataRecord> each : recordsMap.values()) {
- blockingQueue.put(each);
- }
- }
-
- /**
- * Send incremental start event.
- *
- * @param socketSinkImporter socket sink importer
- * @param batchSize batch size
- */
- public void sendIncrementalStartEvent(final SocketSinkImporter socketSinkImporter, final int batchSize) {
- incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
- int count = runningIncrementalTaskCount.incrementAndGet();
- if (count < jobShardingCount || null == dataRecordComparator) {
- return;
- }
- log.debug("start CDC incremental importer");
- if (null == incrementalImporterTask) {
- incrementalImporterTask = new Thread(new CDCIncrementalImporterTask(batchSize));
- incrementalImporterTask.start();
- }
- }
-
- /**
- * Clean socket sink importer connector.
- *
- * @param socketSinkImporter CDC importer
- */
- public void clean(final SocketSinkImporter socketSinkImporter) {
- incrementalRecordMap.remove(socketSinkImporter);
- if (ImporterType.INCREMENTAL == socketSinkImporter.getImporterType()) {
- incrementalTaskRunning = false;
- }
- }
-
- @Override
- public String getType() {
- return CDCSinkType.SOCKET.name();
- }
-
- @Override
- public void close() throws Exception {
- channel.close();
- }
-
- @RequiredArgsConstructor
- private final class CDCIncrementalImporterTask implements Runnable {
-
- private final int batchSize;
-
- @SneakyThrows(InterruptedException.class)
- @Override
- public void run() {
- while (incrementalTaskRunning) {
- Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
- List<DataRecord> dataRecords = new LinkedList<>();
- for (int i = 0; i < batchSize; i++) {
- List<DataRecord> minimumRecords = CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(incrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
- if (minimumRecords.isEmpty()) {
- break;
- }
- dataRecords.addAll(minimumRecords);
- }
- if (dataRecords.isEmpty()) {
- Thread.sleep(200L);
- } else {
- writeImmediately(dataRecords, cdcAckPositionMap);
- }
- }
- }
- }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCChannelProgressPair.java
similarity index 60%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCChannelProgressPair.java
index 89a58775c15..63ec0a1ccf3 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCChannelProgressPair.java
@@ -15,24 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.importer.connector;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
/**
- * Importer connector.
+ * CDC channel and progress listener pair.
*/
-public interface ImporterConnector {
+@RequiredArgsConstructor
+@Getter
+public final class CDCChannelProgressPair {
- /**
- * Get connector.
- *
- * @return connector
- */
- Object getConnector();
+ private final PipelineChannel channel;
- /**
- * Connector type.
- *
- * @return connector type
- */
- String getType();
+ private final PipelineJobProgressListener jobProgressListener;
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
new file mode 100644
index 00000000000..f5124b3bfbd
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
+import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * CDC importer.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class CDCImporter extends AbstractLifecycleExecutor implements Importer {
+
+ @Getter
+ private final String importerId = RandomStringUtils.randomAlphanumeric(8);
+
+ private final List<CDCChannelProgressPair> originalChannelProgressPairs;
+
+ private final int batchSize;
+
+ private final long timeout;
+
+ private final TimeUnit timeUnit;
+
+ private final PipelineSink sink;
+
+ private final boolean needSorting;
+
+ private final JobRateLimitAlgorithm rateLimitAlgorithm;
+
+ private final PriorityQueue<CSNRecords> csnRecordsQueue = new PriorityQueue<>(new CSNRecordsComparator());
+
+ private final Cache<String, Pair<CDCChannelProgressPair, CDCAckPosition>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
+
+ @Override
+ protected void runBlocking() {
+ CDCImporterManager.putImporter(this);
+ List<CDCChannelProgressPair> channelProgressPairs = new ArrayList<>(originalChannelProgressPairs);
+ while (isRunning()) {
+ if (needSorting) {
+ doWithSorting(channelProgressPairs);
+ } else {
+ doWithoutSorting(channelProgressPairs);
+ }
+ if (channelProgressPairs.isEmpty()) {
+ break;
+ }
+ }
+ }
+
+ private void doWithoutSorting(final List<CDCChannelProgressPair> channelProgressPairs) {
+ Iterator<CDCChannelProgressPair> channelProgressPairsIterator = channelProgressPairs.iterator();
+ while (channelProgressPairsIterator.hasNext()) {
+ CDCChannelProgressPair channelProgressPair = channelProgressPairsIterator.next();
+ PipelineChannel channel = channelProgressPair.getChannel();
+ List<Record> records = channel.fetchRecords(batchSize, timeout, timeUnit).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+ if (records.isEmpty()) {
+ continue;
+ }
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+ }
+ String ackId = CDCAckId.build(importerId).marshal();
+ ackCache.put(ackId, Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+ sink.write(ackId, records);
+ Record lastRecord = records.get(records.size() - 1);
+ if (lastRecord instanceof FinishedRecord) {
+ channelProgressPairsIterator.remove();
+ }
+ if (lastRecord instanceof FinishedRecord && records.stream().noneMatch(DataRecord.class::isInstance)) {
+ channel.ack(records);
+ channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
+ }
+ }
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private void doWithSorting(final List<CDCChannelProgressPair> channelProgressPairs) {
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+ }
+ prepareTransactionRecords(channelProgressPairs);
+ CSNRecords csnRecords = csnRecordsQueue.poll();
+ if (null == csnRecords) {
+ timeUnit.sleep(timeout);
+ return;
+ }
+ // TODO Combine small transactions into a large transaction, to improve transformation performance.
+ String ackId = CDCAckId.build(importerId).marshal();
+ List<Record> records = csnRecords.getRecords();
+ ackCache.put(ackId, Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+ sink.write(ackId, filterDataRecords(records));
+ }
+
+ private int getDataRecordsCount(final List<Record> records) {
+ return (int) records.stream().filter(DataRecord.class::isInstance).count();
+ }
+
+ private List<Record> filterDataRecords(final List<Record> records) {
+ return records.stream().filter(DataRecord.class::isInstance).map(each -> (DataRecord) each).collect(Collectors.toList());
+ }
+
+ // TODO openGauss CSN should be incremented for every transaction. Currently, CSN might be duplicated in transactions.
+ // TODO Use channels watermark depth to improve performance.
+ private void prepareTransactionRecords(final List<CDCChannelProgressPair> channelProgressPairs) {
+ if (csnRecordsQueue.isEmpty()) {
+ for (CDCChannelProgressPair each : channelProgressPairs) {
+ PipelineChannel channel = each.getChannel();
+ List<Record> records = channel.pollRecords();
+ if (records.isEmpty()) {
+ continue;
+ }
+ if (0 == getDataRecordsCount(records)) {
+ channel.ack(records);
+ continue;
+ }
+ csnRecordsQueue.add(new CSNRecords(findFirstDataRecord(records).getCsn(), each, records));
+ }
+ } else {
+ CSNRecords csnRecords = csnRecordsQueue.peek();
+ long oldestCSN = findFirstDataRecord(csnRecords.getRecords()).getCsn();
+ for (CDCChannelProgressPair each : channelProgressPairs) {
+ PipelineChannel channel = each.getChannel();
+ List<Record> records = channel.peekRecords();
+ if (records.isEmpty()) {
+ continue;
+ }
+ if (0 == getDataRecordsCount(records)) {
+ records = channel.pollRecords();
+ channel.ack(records);
+ continue;
+ }
+ long csn = findFirstDataRecord(records).getCsn();
+ if (csn < oldestCSN) {
+ records = channel.pollRecords();
+ csnRecordsQueue.add(new CSNRecords(csn, each, records));
+ }
+ }
+ }
+ }
+
+ private DataRecord findFirstDataRecord(final List<Record> records) {
+ for (Record each : records) {
+ if (each instanceof DataRecord) {
+ return (DataRecord) each;
+ }
+ }
+ throw new IllegalStateException("No data record found");
+ }
+
+ /**
+ * Ack.
+ *
+ * @param ackId ack id
+ */
+ public void ack(final String ackId) {
+ Pair<CDCChannelProgressPair, CDCAckPosition> channelPositionPair = ackCache.getIfPresent(ackId);
+ if (null == channelPositionPair) {
+ log.warn("Could not find cached ack info, ack id: {}", ackId);
+ return;
+ }
+ CDCAckPosition ackPosition = channelPositionPair.getRight();
+ channelPositionPair.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
+ channelPositionPair.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+ }
+
+ @Override
+ protected void doStop() {
+ CDCImporterManager.removeImporter(importerId);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterManager.java
similarity index 52%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterManager.java
index 1b090ee9b2a..62f684da2a1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterManager.java
@@ -15,46 +15,43 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.task;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
- * Pipeline task interface.
+ * CDC importer manager.
*/
-public interface PipelineTask {
-
- /**
- * Start task.
- *
- * @return future
- */
- Collection<CompletableFuture<?>> start();
+public final class CDCImporterManager {
- /**
- * Stop task.
- */
- void stop();
+ private static final Map<String, CDCImporter> IMPORTER_MAP = new ConcurrentHashMap<>();
/**
- * Get task id.
+ * Put importer.
*
- * @return task id
+ * @param importer importer
*/
- String getTaskId();
+ public static void putImporter(final CDCImporter importer) {
+ IMPORTER_MAP.put(importer.getImporterId(), importer);
+ }
/**
- * Get task progress.
+ * Get importer.
*
- * @return task progress
+ * @param id importer id
+ * @return importer
*/
- TaskProgress getTaskProgress();
+ public static CDCImporter getImporter(final String id) {
+ return IMPORTER_MAP.get(id);
+ }
/**
- * Close.
+ * Remove importer.
+ *
+ * @param id importer id
*/
- void close();
+ public static void removeImporter(final String id) {
+ IMPORTER_MAP.remove(id);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecords.java
similarity index 65%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecords.java
index 89a58775c15..7e6d3469eed 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecords.java
@@ -15,24 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.importer.connector;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+
+import java.util.List;
/**
- * Importer connector.
+ * CSN records.
*/
-public interface ImporterConnector {
+@RequiredArgsConstructor
+@Getter
+public final class CSNRecords {
+
+ private final long csn;
- /**
- * Get connector.
- *
- * @return connector
- */
- Object getConnector();
+ private final CDCChannelProgressPair channelProgressPair;
- /**
- * Connector type.
- *
- * @return connector type
- */
- String getType();
+ private final List<Record> records;
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparator.java
similarity index 66%
copy from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparator.java
index 0953a373026..9b65c8b7130 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparator.java
@@ -15,19 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
-public final class FixtureImporterConnector implements ImporterConnector {
-
- @Override
- public Object getConnector() {
- return null;
- }
+import java.util.Comparator;
+
+/**
+ * CSN records comparator.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CSNRecordsComparator implements Comparator<CSNRecords> {
@Override
- public String getType() {
- return "FIXTURE";
+ public int compare(final CSNRecords o1, final CSNRecords o2) {
+ return Long.compare(o1.getCsn(), o2.getCsn());
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
deleted file mode 100644
index a6eb2942911..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * Socket sink importer.
- */
-@Slf4j
-public final class SocketSinkImporter extends AbstractLifecycleExecutor implements Importer {
-
- @Getter(AccessLevel.PROTECTED)
- private final ImporterConfiguration importerConfig;
-
- private final PipelineChannel channel;
-
- private final SocketSinkImporterConnector importerConnector;
-
- private final PipelineJobProgressListener jobProgressListener;
-
- @Getter
- private final ImporterType importerType;
-
- private final JobRateLimitAlgorithm rateLimitAlgorithm;
-
- public SocketSinkImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
- this.importerConfig = importerConfig;
- rateLimitAlgorithm = null == importerConfig ? null : importerConfig.getRateLimitAlgorithm();
- this.channel = channel;
- this.importerConnector = (SocketSinkImporterConnector) importerConnector;
- this.jobProgressListener = jobProgressListener;
- this.importerType = importerType;
- }
-
- @Override
- protected void runBlocking() {
- int batchSize = importerConfig.getBatchSize();
- if (ImporterType.INCREMENTAL == importerType) {
- importerConnector.sendIncrementalStartEvent(this, batchSize);
- }
- while (isRunning()) {
- List<Record> records = channel.fetchRecords(batchSize, 500, TimeUnit.MILLISECONDS);
- if (!records.isEmpty()) {
- List<Record> recordList = records.stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
- processDataRecords(recordList);
- if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
- break;
- }
- }
- }
- }
-
- private void processDataRecords(final List<Record> recordList) {
- if (null == recordList || recordList.isEmpty()) {
- return;
- }
- if (null != rateLimitAlgorithm) {
- rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
- }
- importerConnector.write(recordList, this, importerType);
- }
-
- /**
- * Ack with last data record.
- *
- * @param cdcAckPosition cdc ack position
- */
- public void ackWithLastDataRecord(final CDCAckPosition cdcAckPosition) {
- channel.ack(Collections.singletonList(cdcAckPosition.getLastRecord()));
- jobProgressListener.onProgressUpdated(new PipelineJobProgressUpdatedParameter(cdcAckPosition.getDataRecordCount()));
- }
-
- @Override
- protected void doStop() {
- if (ImporterType.INCREMENTAL == importerType) {
- importerConnector.clean(this);
- CDCAckHolder.getInstance().cleanUp(this);
- }
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
deleted file mode 100644
index bea3b61c1da..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-
-/**
- * Socket sink importer creator.
- */
-public final class SocketSinkImporterCreator implements ImporterCreator {
-
- @Override
- public Importer createImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
- return new SocketSinkImporter(importerConfig, importerConnector, channel, jobProgressListener, importerType);
- }
-
- @Override
- public String getType() {
- return CDCSinkType.SOCKET.name();
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
new file mode 100644
index 00000000000..155d5b6d332
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
+
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * CDC socket sink.
+ */
+@Slf4j
+public final class CDCSocketSink implements PipelineSink {
+
+ private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition condition = lock.newCondition();
+
+ private final ShardingSphereDatabase database;
+
+ private final Channel channel;
+
+ private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+
+ public CDCSocketSink(final Channel channel, final ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
+ this.channel = channel;
+ this.database = database;
+ schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
+ String[] split = each.split("\\.");
+ tableNameSchemaMap.put(split[1], split[0]);
+ });
+ }
+
+ @Override
+ public boolean identifierMatched(final Object identifier) {
+ return channel.id().equals(identifier);
+ }
+
+ @Override
+ public PipelineJobProgressUpdatedParameter write(final String ackId, final List<Record> records) {
+ if (records.isEmpty()) {
+ return new PipelineJobProgressUpdatedParameter(0);
+ }
+ while (!channel.isWritable() && channel.isActive()) {
+ doAwait();
+ }
+ if (!channel.isActive()) {
+ return new PipelineJobProgressUpdatedParameter(0);
+ }
+ List<DataRecordResult.Record> resultRecords = new LinkedList<>();
+ for (Record each : records) {
+ if (!(each instanceof DataRecord)) {
+ continue;
+ }
+ DataRecord dataRecord = (DataRecord) each;
+ resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+ }
+ DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
+ channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+ return new PipelineJobProgressUpdatedParameter(resultRecords.size());
+ }
+
+ private void doAwait() {
+ lock.lock();
+ try {
+ condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index ee6ea603e61..314e1e16c53 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -17,9 +17,13 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.job;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
@@ -30,22 +34,32 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* CDC job.
*/
@Slf4j
-public final class CDCJob extends AbstractSimplePipelineJob {
+public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
- private final ImporterConnector importerConnector;
+ @Getter
+ private final PipelineSink sink;
private final CDCJobAPI jobAPI = new CDCJobAPI();
@@ -53,37 +67,147 @@ public final class CDCJob extends AbstractSimplePipelineJob {
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
- public CDCJob(final String jobId, final ImporterConnector importerConnector) {
+ public CDCJob(final String jobId, final PipelineSink sink) {
super(jobId);
- this.importerConnector = importerConnector;
+ this.sink = sink;
}
@Override
- protected void doPrepare(final PipelineJobItemContext jobItemContext) {
- jobPreparer.initTasks((CDCJobItemContext) jobItemContext);
+ public void execute(final ShardingContext shardingContext) {
+ String jobId = shardingContext.getJobName();
+ log.info("Execute job {}", jobId);
+ CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ List<CDCJobItemContext> jobItemContexts = new LinkedList<>();
+ for (int shardingItem = 0; shardingItem < jobConfig.getJobShardingCount(); shardingItem++) {
+ if (isStopping()) {
+ log.info("stopping true, ignore");
+ return;
+ }
+ CDCJobItemContext jobItemContext = buildPipelineJobItemContext(jobConfig, shardingItem);
+ PipelineTasksRunner tasksRunner = new CDCTasksRunner(jobItemContext);
+ if (!addTasksRunner(shardingItem, tasksRunner)) {
+ continue;
+ }
+ jobItemContexts.add(jobItemContext);
+ jobAPI.cleanJobItemErrorMessage(jobId, shardingItem);
+ log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
+ }
+ if (jobItemContexts.isEmpty()) {
+ log.warn("job item contexts empty, ignore");
+ return;
+ }
+ prepare(jobItemContexts);
+ executeInventoryTasks(jobItemContexts);
+ executeIncrementalTasks(jobItemContexts);
}
- @Override
- protected PipelineJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
- int shardingItem = shardingContext.getShardingItem();
- CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
+ Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
- return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, importerConnector);
+ return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
+ }
+
+ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
+ try {
+ jobPreparer.initTasks(jobItemContexts);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ for (PipelineJobItemContext each : jobItemContexts) {
+ processFailed(each, ex);
+ }
+ throw ex;
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ for (PipelineJobItemContext each : jobItemContexts) {
+ processFailed(each, ex);
+ }
+ throw new PipelineInternalException(ex);
+ }
}
@Override
- protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
- InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
- return new CDCTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+ protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
+ String jobId = jobItemContext.getJobId();
+ log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
+ jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
+ PipelineJobCenter.stop(jobId);
+ jobAPI.updateJobConfigurationDisabled(jobId, true);
+ }
+
+ private void executeInventoryTasks(final List<CDCJobItemContext> jobItemContexts) {
+ Collection<CompletableFuture<?>> futures = new LinkedList<>();
+ for (CDCJobItemContext each : jobItemContexts) {
+ updateLocalAndRemoteJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
+ for (PipelineTask task : each.getInventoryTasks()) {
+ if (task.getTaskProgress().getPosition() instanceof FinishedPosition) {
+ continue;
+ }
+ futures.addAll(task.start());
+ }
+ }
+ if (futures.isEmpty()) {
+ return;
+ }
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", jobItemContexts.get(0)));
+ }
+
+ private void updateLocalAndRemoteJobItemStatus(final PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
+ jobItemContext.setStatus(jobStatus);
+ jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
+ }
+
+ private void executeIncrementalTasks(final List<CDCJobItemContext> jobItemContexts) {
+ log.info("execute incremental tasks, jobId={}", getJobId());
+ Collection<CompletableFuture<?>> futures = new LinkedList<>();
+ for (CDCJobItemContext each : jobItemContexts) {
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
+ log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
+ return;
+ }
+ updateLocalAndRemoteJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK);
+ for (PipelineTask task : each.getIncrementalTasks()) {
+ if (task.getTaskProgress().getPosition() instanceof FinishedPosition) {
+ continue;
+ }
+ futures.addAll(task.start());
+ }
+ }
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", jobItemContexts.get(0)));
+ }
+
+ @Override
+ protected void doPrepare(final PipelineJobItemContext jobItemContext) {
+ throw new UnsupportedOperationException();
}
@Override
protected void doClean() {
dataSourceManager.close();
- if (importerConnector instanceof AutoCloseable) {
- CloseUtils.closeQuietly((AutoCloseable) importerConnector);
+ CloseUtils.closeQuietly(sink);
+ }
+
+ @RequiredArgsConstructor
+ private final class CDCExecuteCallback implements ExecuteCallback {
+
+ private final String identifier;
+
+ private final PipelineJobItemContext jobItemContext;
+
+ @Override
+ public void onSuccess() {
+ log.info("onSuccess, all {} tasks finished.", identifier);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ log.error("onFailure, {} task execute failed.", identifier, throwable);
+ String jobId = jobItemContext.getJobId();
+ jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+ PipelineJobCenter.stop(jobId);
+ jobAPI.updateJobConfigurationDisabled(jobId, true);
}
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index b5b5f52d2b1..5fb6519ca36 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,26 +18,45 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
-import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
+import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* CDC job preparer.
@@ -50,9 +69,21 @@ public final class CDCJobPreparer {
/**
* Do prepare work.
*
- * @param jobItemContext job item context
+ * @param jobItemContexts job item contexts
*/
- public void initTasks(final CDCJobItemContext jobItemContext) {
+ public void initTasks(final Collection<CDCJobItemContext> jobItemContexts) {
+ // TODO Use pipeline tree to build it
+ AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
+ List<CDCChannelProgressPair> inventoryChannelProgressPairs = new LinkedList<>();
+ AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
+ List<CDCChannelProgressPair> incrementalChannelProgressPairs = new LinkedList<>();
+ for (CDCJobItemContext each : jobItemContexts) {
+ initTasks0(each, inventoryImporterUsed, inventoryChannelProgressPairs, incrementalImporterUsed, incrementalChannelProgressPairs);
+ }
+ }
+
+ private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> inventoryChannelProgressPairs,
+ final AtomicBoolean incrementalImporterUsed, final List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
if (!jobItemProgress.isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
@@ -61,23 +92,14 @@ public final class CDCJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- initIncrementalTasks(jobItemContext);
- CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
- if (jobConfig.isFull()) {
- initInventoryTasks(jobItemContext);
+ prepareIncremental(jobItemContext);
+ if (jobItemContext.getJobConfig().isFull()) {
+ initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs);
}
+ initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs);
}
- private void initInventoryTasks(final CDCJobItemContext jobItemContext) {
- CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
- // TODO channel requires a new implementation
- InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
- InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, taskConfig.getImporterConfig());
- List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext);
- jobItemContext.getInventoryTasks().addAll(allInventoryTasks);
- }
-
- private void initIncrementalTasks(final CDCJobItemContext jobItemContext) {
+ private void prepareIncremental(final CDCJobItemContext jobItemContext) {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
try {
@@ -85,11 +107,51 @@ public final class CDCJobPreparer {
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
}
- PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
- PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
- ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
- IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
- pipelineChannelCreator, jobItemContext.getImporterConnector(), sourceMetaDataLoader, incrementalExecuteEngine, jobItemContext);
+ }
+
+ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
+ long startTimeMillis = System.currentTimeMillis();
+ CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+ ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
+ CDCProcessContext processContext = jobItemContext.getJobProcessContext();
+ for (InventoryDumperConfiguration each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperConfiguration(taskConfig.getDumperConfig()), importerConfig)
+ .splitInventoryDumperConfig(jobItemContext)) {
+ AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
+ PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
+ channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
+ Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
+ Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
+ needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType())),
+ importerConfig.getRateLimitAlgorithm());
+ jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
+ processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
+ importerUsed.set(true);
+ }
+ log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ }
+
+ private boolean needSorting(final ImporterType importerType, final boolean hasGlobalCSN) {
+ return ImporterType.INCREMENTAL == importerType && hasGlobalCSN;
+ }
+
+ private boolean hasGlobalCSN(final DatabaseType databaseType) {
+ return databaseType instanceof OpenGaussDatabaseType;
+ }
+
+ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
+ CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+ DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+ ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
+ IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress());
+ PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress);
+ channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
+ Dumper dumper = PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+ .createIncrementalDumper(dumperConfig, dumperConfig.getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
+ boolean needSorting = needSorting(ImporterType.INCREMENTAL, hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
+ Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
+ jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
+ PipelineTask incrementalTask = new CDCIncrementalTask(dumperConfig.getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
+ importerUsed.set(true);
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
new file mode 100644
index 00000000000..ed872eb8f76
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.TaskExecuteCallback;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * CDC incremental task.
+ */
+@RequiredArgsConstructor
+@Slf4j
+@ToString(exclude = {"incrementalExecuteEngine", "dumper", "importer", "taskProgress"})
+public final class CDCIncrementalTask implements PipelineTask {
+
+ @Getter
+ private final String taskId;
+
+ private final ExecuteEngine incrementalExecuteEngine;
+
+ private final Dumper dumper;
+
+ @Nullable
+ private final Importer importer;
+
+ @Getter
+ private final IncrementalTaskProgress taskProgress;
+
+ @Override
+ public Collection<CompletableFuture<?>> start() {
+ taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+ Collection<CompletableFuture<?>> result = new LinkedList<>();
+ result.add(incrementalExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+ if (null != importer) {
+ result.add(incrementalExecuteEngine.submit(importer, new TaskExecuteCallback(this)));
+ }
+ return result;
+ }
+
+ @Override
+ public void stop() {
+ dumper.stop();
+ if (null != importer) {
+ importer.stop();
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
new file mode 100644
index 00000000000..eda3b485897
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.TaskExecuteCallback;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * CDC inventory task.
+ */
+@RequiredArgsConstructor
+@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "dumper", "importer"})
+@Slf4j
+public final class CDCInventoryTask implements PipelineTask {
+
+ @Getter
+ private final String taskId;
+
+ private final ExecuteEngine inventoryDumperExecuteEngine;
+
+ private final ExecuteEngine inventoryImporterExecuteEngine;
+
+ private final Dumper dumper;
+
+ @Nullable
+ private final Importer importer;
+
+ private final AtomicReference<IngestPosition> position;
+
+ @Override
+ public Collection<CompletableFuture<?>> start() {
+ Collection<CompletableFuture<?>> result = new LinkedList<>();
+ result.add(inventoryDumperExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+ if (null != importer) {
+ result.add(inventoryImporterExecuteEngine.submit(importer, new TaskExecuteCallback(this)));
+ }
+ return result;
+ }
+
+ @Override
+ public void stop() {
+ dumper.stop();
+ if (null != importer) {
+ importer.stop();
+ }
+ }
+
+ @Override
+ public InventoryTaskProgress getTaskProgress() {
+ return new InventoryTaskProgress(position.get());
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
index 9f78ddf90b8..60ed3fb3915 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -18,22 +18,49 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.task;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
import java.util.Collection;
/**
* CDC tasks runner.
*/
-public final class CDCTasksRunner extends InventoryIncrementalTasksRunner {
+public final class CDCTasksRunner implements PipelineTasksRunner {
- public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
- super(jobItemContext, inventoryTasks, incrementalTasks);
+ private final InventoryIncrementalJobItemContext jobItemContext;
+
+ private final Collection<PipelineTask> inventoryTasks;
+
+ private final Collection<PipelineTask> incrementalTasks;
+
+ public CDCTasksRunner(final InventoryIncrementalJobItemContext jobItemContext) {
+ this.jobItemContext = jobItemContext;
+ inventoryTasks = jobItemContext.getInventoryTasks();
+ incrementalTasks = jobItemContext.getIncrementalTasks();
+ }
+
+ @Override
+ public PipelineJobItemContext getJobItemContext() {
+ return jobItemContext;
+ }
+
+ @Override
+ public void start() {
}
@Override
- protected void inventorySuccessCallback() {
- executeIncrementalTask();
+ public void stop() {
+ jobItemContext.setStopping(true);
+ for (PipelineTask each : inventoryTasks) {
+ each.stop();
+ CloseUtils.closeQuietly(each);
+ }
+ for (PipelineTask each : incrementalTasks) {
+ each.stop();
+ CloseUtils.closeQuietly(each);
+ }
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
deleted file mode 100644
index 10bd3a9c6a2..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.generator;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-
-import java.util.Comparator;
-
-/**
- * Data record comparator generator.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class DataRecordComparatorGenerator {
-
- /**
- * Generator comparator.
- *
- * @param databaseType database type
- * @return data record comparator
- */
- public static Comparator<DataRecord> generatorIncrementalComparator(final DatabaseType databaseType) {
- if (databaseType instanceof OpenGaussDatabaseType) {
- return Comparator.comparing(DataRecord::getCsn, Comparator.nullsFirst(Comparator.naturalOrder()));
- }
- // TODO MySQL and PostgreSQL not support now
- return null;
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 1d0751d8adc..69a58617e37 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -21,21 +21,20 @@ import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
-import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
@@ -57,7 +56,6 @@ import org.apache.shardingsphere.single.rule.SingleRule;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -152,10 +150,7 @@ public final class CDCBackendHandler {
PipelineJobCenter.stop(jobId);
}
ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
- Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
- ? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
- : null;
- jobAPI.startJob(jobId, new SocketSinkImporterConnector(channel, database, cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+ jobAPI.startJob(jobId, new CDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);
}
@@ -170,22 +165,14 @@ public final class CDCBackendHandler {
log.warn("job id is null or empty, ignored");
return;
}
- List<Integer> shardingItems = new ArrayList<>(PipelineJobCenter.getShardingItems(jobId));
- if (shardingItems.isEmpty()) {
+ CDCJob job = (CDCJob) PipelineJobCenter.getJob(jobId);
+ if (null == job) {
return;
}
- Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItems.get(0));
- if (!jobItemContext.isPresent()) {
- return;
- }
- CDCJobItemContext cdcJobItemContext = (CDCJobItemContext) jobItemContext.get();
- if (cdcJobItemContext.getImporterConnector() instanceof SocketSinkImporterConnector) {
- Channel channel = (Channel) cdcJobItemContext.getImporterConnector().getConnector();
- if (channelId.equals(channel.id())) {
- log.info("close CDC job, channel id: {}", channelId);
- PipelineJobCenter.stop(jobId);
- jobAPI.updateJobConfigurationDisabled(jobId, true);
- }
+ if (job.getSink().identifierMatched(channelId)) {
+ log.info("close CDC job, channel id: {}", channelId);
+ PipelineJobCenter.stop(jobId);
+ jobAPI.updateJobConfigurationDisabled(jobId, true);
}
}
@@ -214,6 +201,12 @@ public final class CDCBackendHandler {
* @param requestBody request body
*/
public void processAck(final AckStreamingRequestBody requestBody) {
- CDCAckHolder.getInstance().ack(requestBody.getAckId());
+ CDCAckId ackId = CDCAckId.unmarshal(requestBody.getAckId());
+ CDCImporter importer = CDCImporterManager.getImporter(ackId.getImporterId());
+ if (null == importer) {
+ log.warn("Could not find importer, ack id: {}", ackId.marshal());
+ return;
+ }
+ importer.ack(ackId.marshal());
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
deleted file mode 100644
index 6189f2ba161..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * CDC data record utility class.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCDataRecordUtils {
-
- /**
- * Find minimum data record and save position.
- *
- * @param incrementalRecordMap CDC ack position map.
- * @param dataRecordComparator CDC ack position map.
- * @param cdcAckPositionMap CDC ack position map.
- * @return minimum data record
- */
- public static List<DataRecord> findMinimumDataRecordsAndSavePosition(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
- final Comparator<DataRecord> dataRecordComparator, final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
- if (null == dataRecordComparator) {
- return findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
- } else {
- return findMinimumDataRecordWithComparator(incrementalRecordMap, cdcAckPositionMap, dataRecordComparator);
- }
- }
-
- private static List<DataRecord> findMinimumDataRecordWithoutComparator(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
- final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
- for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry : incrementalRecordMap.entrySet()) {
- List<DataRecord> records = entry.getValue().poll();
- if (null == records || records.isEmpty()) {
- continue;
- }
- DataRecord lastRecord = records.get(records.size() - 1);
- saveAckPosition(cdcAckPositionMap, entry.getKey(), lastRecord);
- return records;
- }
- return Collections.emptyList();
- }
-
- private static void saveAckPosition(final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final SocketSinkImporter socketSinkImporter, final Record record) {
- CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(socketSinkImporter);
- if (null == cdcAckPosition) {
- cdcAckPositionMap.put(socketSinkImporter, new CDCAckPosition(record, 1));
- } else {
- cdcAckPosition.setLastRecord(record);
- cdcAckPosition.addDataRecordCount(cdcAckPosition.getDataRecordCount());
- }
- }
-
- private static List<DataRecord> findMinimumDataRecordWithComparator(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
- final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final Comparator<DataRecord> dataRecordComparator) {
- Map<SocketSinkImporter, List<DataRecord>> waitSortedMap = new HashMap<>();
- for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry : incrementalRecordMap.entrySet()) {
- List<DataRecord> peek = entry.getValue().peek();
- if (null == peek) {
- continue;
- }
- waitSortedMap.put(entry.getKey(), peek);
- }
- if (waitSortedMap.isEmpty()) {
- return Collections.emptyList();
- }
- List<DataRecord> result = null;
- SocketSinkImporter belongImporter = null;
- for (Entry<SocketSinkImporter, List<DataRecord>> entry : waitSortedMap.entrySet()) {
- if (null == result) {
- result = entry.getValue();
- belongImporter = entry.getKey();
- continue;
- }
- if (dataRecordComparator.compare(result.get(0), entry.getValue().get(0)) > 0) {
- result = entry.getValue();
- belongImporter = entry.getKey();
- }
- }
- if (null == result) {
- return Collections.emptyList();
- }
- incrementalRecordMap.get(belongImporter).poll();
- saveAckPosition(cdcAckPositionMap, belongImporter, result.get(result.size() - 1));
- return result;
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
deleted file mode 100644
index 8b03ec4f243..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporterCreator
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
deleted file mode 100644
index ffc722d6ad2..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-import org.apache.shardingsphere.infra.util.reflection.ReflectionUtils;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-class CDCAckHolderTest {
-
- @Test
- void assertBindAckIdWithPositionAndAck() {
- CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
- final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
- SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
- importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0));
- Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtils.getFieldValue(cdcAckHolder, "ackIdPositionMap");
- assertTrue(ackIdPositionMap.isPresent());
- assertTrue(ackIdPositionMap.get().isEmpty());
- String ackId = cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
- assertThat(ackIdPositionMap.get().size(), is(1));
- cdcAckHolder.ack(ackId);
- assertTrue(ackIdPositionMap.get().isEmpty());
- }
-
- @Test
- void assertCleanUpTimeoutAckId() {
- CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
- final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
- SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
- importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), System.currentTimeMillis() - 60 * 1000 * 10));
- cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
- cdcAckHolder.cleanUp(socketSinkImporter);
- Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtils.getFieldValue(cdcAckHolder, "ackIdPositionMap");
- assertTrue(ackIdPositionMap.isPresent());
- assertTrue(ackIdPositionMap.get().isEmpty());
- }
-}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckIdTest.java
similarity index 60%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
rename to kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckIdTest.java
index 0953a373026..e4cd727c2c4 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckIdTest.java
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.junit.jupiter.api.Test;
-public final class FixtureImporterConnector implements ImporterConnector {
-
- @Override
- public Object getConnector() {
- return null;
- }
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class CDCAckIdTest {
- @Override
- public String getType() {
- return "FIXTURE";
+ @Test
+ void assertBuild() {
+ CDCAckId expected = CDCAckId.build("importer1");
+ String text = expected.marshal();
+ CDCAckId actual = CDCAckId.unmarshal(text);
+ assertThat(actual.getImporterId(), is(expected.getImporterId()));
+ assertThat(actual.getRandom(), is(expected.getRandom()));
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
new file mode 100644
index 00000000000..96587fb77a6
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.PriorityQueue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+class CSNRecordsComparatorTest {
+
+ @Test
+ void assertSort() {
+ PipelineChannel channel = mock(PipelineChannel.class);
+ PriorityQueue<CSNRecords> queue = new PriorityQueue<>(new CSNRecordsComparator());
+ CDCChannelProgressPair channelProgressPair = new CDCChannelProgressPair(channel, mock(PipelineJobProgressListener.class));
+ queue.add(new CSNRecords(3L, channelProgressPair, Collections.emptyList()));
+ queue.add(new CSNRecords(1L, channelProgressPair, Collections.emptyList()));
+ queue.add(new CSNRecords(2L, channelProgressPair, Collections.emptyList()));
+ assertThat(queue.size(), is(3));
+ assertThat(queue.poll().getCsn(), is(1L));
+ assertThat(queue.poll().getCsn(), is(2L));
+ assertThat(queue.poll().getCsn(), is(3L));
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
deleted file mode 100644
index 21c050be78f..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-
-import io.netty.channel.Channel;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
-
-@ExtendWith(MockitoExtension.class)
-class SocketSinkImporterCreatorTest {
-
- @Mock
- private ImporterConfiguration importerConfig;
-
- @Test
- void assertCreateCDCImporter() {
- SocketSinkImporterConnector importerConnector = new SocketSinkImporterConnector(mock(Channel.class), mock(ShardingSphereDatabase.class), 1, Collections.emptyList(), null);
- assertThat(TypedSPILoader.getService(ImporterCreator.class, "Socket").createImporter(importerConfig, importerConnector, null, null, null), instanceOf(SocketSinkImporter.class));
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
deleted file mode 100644
index 7ddd2b43e4f..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.generator;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.junit.jupiter.api.Test;
-
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-class DataRecordComparatorGeneratorTest {
-
- @Test
- void assertGeneratorIncrementalComparator() {
- Comparator<DataRecord> dataRecordComparator = DataRecordComparatorGenerator.generatorIncrementalComparator(new OpenGaussDatabaseType());
- List<DataRecord> dataRecords = new LinkedList<>();
- dataRecords.add(generateDataRecord(1L));
- dataRecords.add(generateDataRecord(100L));
- dataRecords.add(generateDataRecord(0L));
- dataRecords.add(generateDataRecord(null));
- dataRecords.sort(dataRecordComparator);
- assertNull(dataRecords.get(0).getCsn());
- assertThat(dataRecords.get(1).getCsn(), is(0L));
- assertThat(dataRecords.get(2).getCsn(), is(1L));
- assertThat(dataRecords.get(3).getCsn(), is(100L));
- }
-
- private DataRecord generateDataRecord(final Long csn) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 0);
- result.setCsn(csn);
- return result;
- }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
deleted file mode 100644
index 514c948b6fb..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.util;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-class CDCDataRecordUtilsTest {
-
- @Test
- void assertFindMinimumDataRecordAndSavePosition() throws InterruptedException {
- final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> actualIncrementalRecordMap = new HashMap<>();
- ArrayBlockingQueue<List<DataRecord>> queueFirst = new ArrayBlockingQueue<>(5);
- queueFirst.put(Collections.singletonList(generateDataRecord(0)));
- queueFirst.put(Collections.singletonList(generateDataRecord(2)));
- queueFirst.put(Collections.singletonList(generateDataRecord(4)));
- SocketSinkImporter mockSocketSinkImporterFirst = mock(SocketSinkImporter.class);
- actualIncrementalRecordMap.put(mockSocketSinkImporterFirst, queueFirst);
- ArrayBlockingQueue<List<DataRecord>> queueSecond = new ArrayBlockingQueue<>(5);
- queueSecond.put(Collections.singletonList(generateDataRecord(1)));
- queueSecond.put(Collections.singletonList(generateDataRecord(3)));
- queueSecond.put(Collections.singletonList(generateDataRecord(5)));
- SocketSinkImporter mockSocketSinkImporterSecond = mock(SocketSinkImporter.class);
- actualIncrementalRecordMap.put(mockSocketSinkImporterSecond, queueSecond);
- Comparator<DataRecord> dataRecordComparator = DataRecordComparatorGenerator.generatorIncrementalComparator(new OpenGaussDatabaseType());
- final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
- for (long i = 0; i <= 5; i++) {
- List<DataRecord> minimumDataRecord = CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
- assertThat(minimumDataRecord.size(), is(1));
- assertThat(minimumDataRecord.get(0).getCsn(), is(i));
- }
- assertTrue(CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap, dataRecordComparator, cdcAckPositionMap).isEmpty());
- }
-
- private DataRecord generateDataRecord(final long csn) {
- DataRecord dataRecord = new DataRecord(new PlaceholderPosition(), 0);
- dataRecord.setCsn(csn);
- return dataRecord;
- }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index c77a33219b2..c729ebed79d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import java.util.Collection;
@@ -63,11 +63,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
PipelineTableMetaDataLoader getSourceMetaDataLoader();
/**
- * Get importer connector.
+ * Get sink.
*
- * @return importer connector
+ * @return sink
*/
- ImporterConnector getImporterConnector();
+ PipelineSink getSink();
/**
* Get processed record count.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
deleted file mode 100644
index 900d674bf8b..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-
-/**
- * Data source importer creator.
- */
-public final class DataSourceImporterCreator implements ImporterCreator {
-
- @Override
- public Importer createImporter(final ImporterConfiguration importerConfig,
- final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
- return new DataSourceImporter(importerConfig, importerConnector, channel, jobProgressListener);
- }
-
- @Override
- public String getType() {
- return "DataSource";
- }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
new file mode 100644
index 00000000000..52819ef48a3
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.importer;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Single channel consumer importer.
+ */
+@RequiredArgsConstructor
+public final class SingleChannelConsumerImporter extends AbstractLifecycleExecutor implements Importer {
+
+ private final PipelineChannel channel;
+
+ private final int batchSize;
+
+ private final int timeout;
+
+ private final TimeUnit timeUnit;
+
+ private final PipelineSink sink;
+
+ private final PipelineJobProgressListener jobProgressListener;
+
+ @Override
+ protected void runBlocking() {
+ while (isRunning()) {
+ List<Record> records = channel.fetchRecords(batchSize, timeout, timeUnit).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+ if (records.isEmpty()) {
+ continue;
+ }
+ PipelineJobProgressUpdatedParameter updatedParam = sink.write("", records);
+ channel.ack(records);
+ jobProgressListener.onProgressUpdated(updatedParam);
+ if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ CloseUtils.closeQuietly(sink);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
similarity index 85%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 2a662553ec0..13720675810 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.importer;
+package org.apache.shardingsphere.data.pipeline.core.importer.sink;
import lombok.AccessLevel;
import lombok.Getter;
@@ -23,22 +23,19 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
+import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
@@ -52,7 +49,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -61,7 +57,7 @@ import java.util.stream.IntStream;
* Default importer.
*/
@Slf4j
-public final class DataSourceImporter extends AbstractLifecycleExecutor implements Importer {
+public final class PipelineDataSourceSink implements PipelineSink {
private static final DataRecordMerger MERGER = new DataRecordMerger();
@@ -72,10 +68,6 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
private final PipelineSQLBuilder pipelineSqlBuilder;
- private final PipelineChannel channel;
-
- private final PipelineJobProgressListener jobProgressListener;
-
private final JobRateLimitAlgorithm rateLimitAlgorithm;
private final AtomicReference<Statement> batchInsertStatement = new AtomicReference<>();
@@ -84,30 +76,21 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
private final AtomicReference<Statement> batchDeleteStatement = new AtomicReference<>();
- public DataSourceImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener jobProgressListener) {
+ public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
- this.dataSourceManager = (PipelineDataSourceManager) importerConnector.getConnector();
- this.channel = channel;
+ this.dataSourceManager = dataSourceManager;
pipelineSqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, importerConfig.getDataSourceConfig().getDatabaseType().getType());
- this.jobProgressListener = jobProgressListener;
}
@Override
- protected void runBlocking() {
- int batchSize = importerConfig.getBatchSize();
- while (isRunning()) {
- List<Record> records = channel.fetchRecords(batchSize, 3, TimeUnit.SECONDS);
- if (!records.isEmpty()) {
- PipelineJobProgressUpdatedParameter updatedParam = flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
- channel.ack(records);
- jobProgressListener.onProgressUpdated(updatedParam);
- if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
- break;
- }
- }
- }
+ public boolean identifierMatched(final Object identifier) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PipelineJobProgressUpdatedParameter write(final String ackId, final List<Record> records) {
+ return flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
}
private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
@@ -140,7 +123,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
@SneakyThrows(InterruptedException.class)
private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
- for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); i++) {
+ for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
try {
doFlush(dataSource, buffer);
return;
@@ -310,9 +293,9 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
}
@Override
- protected void doStop() throws SQLException {
- cancelStatement(batchInsertStatement.get());
- cancelStatement(updateStatement.get());
- cancelStatement(batchDeleteStatement.get());
+ public void close() {
+ PipelineJdbcUtils.cancelStatement(batchInsertStatement.get());
+ PipelineJdbcUtils.cancelStatement(updateStatement.get());
+ PipelineJdbcUtils.cancelStatement(batchDeleteStatement.get());
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallbacks.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallbacks.java
new file mode 100644
index 00000000000..e8735baefbb
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallbacks.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.channel;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Ack callback utilities.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class AckCallbacks {
+
+ /**
+ * Ack callback for inventory dump.
+ *
+ * @param records record list
+ * @param position ingest position
+ */
+ public static void inventoryCallback(final List<Record> records, final AtomicReference<IngestPosition> position) {
+ Record lastRecord = records.get(records.size() - 1);
+ position.set(lastRecord.getPosition());
+ }
+
+ /**
+ * Ack callback for incremental dump.
+ *
+ * @param records record list
+ * @param progress incremental task progress
+ */
+ public static void incrementalCallback(final List<Record> records, final IncrementalTaskProgress progress) {
+ Record lastHandledRecord = records.get(records.size() - 1);
+ if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
+ progress.setPosition(lastHandledRecord.getPosition());
+ progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
+ }
+ progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 2e09b725c51..e4e77fe118d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -82,10 +82,20 @@ public final class MultiplexMemoryPipelineChannel implements PipelineChannel {
}
@Override
- public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
+ public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) {
return findChannel().fetchRecords(batchSize, timeout, timeUnit);
}
+ @Override
+ public List<Record> peekRecords() {
+ return findChannel().peekRecords();
+ }
+
+ @Override
+ public List<Record> pollRecords() {
+ return findChannel().pollRecords();
+ }
+
@Override
public void ack(final List<Record> records) {
findChannel().ack(records);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 605c3d25849..82a5a8fa071 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -51,25 +52,38 @@ public final class SimpleMemoryPipelineChannel implements PipelineChannel {
@SneakyThrows(InterruptedException.class)
// TODO thread-safe?
@Override
- public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
+ public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) {
List<Record> result = new LinkedList<>();
- long start = System.currentTimeMillis();
+ long startMillis = System.currentTimeMillis();
+ long timeoutMillis = timeUnit.toMillis(timeout);
int recordsCount = 0;
- while (batchSize > recordsCount) {
+ do {
List<Record> records = queue.poll();
if (null == records || records.isEmpty()) {
- TimeUnit.MILLISECONDS.sleep(Math.min(100, timeUnit.toMillis(timeout)));
+ TimeUnit.MILLISECONDS.sleep(Math.min(100, timeoutMillis));
} else {
recordsCount += records.size();
result.addAll(records);
}
- if (timeUnit.toMillis(timeout) <= System.currentTimeMillis() - start) {
+ if (recordsCount >= batchSize) {
break;
}
- }
+ } while (System.currentTimeMillis() - startMillis < timeoutMillis);
return result;
}
+ @Override
+ public List<Record> peekRecords() {
+ List<Record> result = queue.peek();
+ return null != result ? result : Collections.emptyList();
+ }
+
+ @Override
+ public List<Record> pollRecords() {
+ List<Record> result = queue.poll();
+ return null != result ? result : Collections.emptyList();
+ }
+
@Override
public void ack(final List<Record> records) {
ackCallback.onAck(records);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 4c6f41b3ef7..576b437728f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -220,7 +220,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
}
@Override
- protected void doStop() throws SQLException {
- cancelStatement(dumpStatement.get());
+ protected void doStop() {
+ PipelineJdbcUtils.cancelStatement(dumpStatement.get());
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 452be0e1520..6d58c4c8fe7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -103,7 +103,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws SQLException;
- private void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
+ protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index bd46511735c..4826c302077 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -59,6 +59,16 @@ public final class PipelineJobCenter {
return JOB_MAP.containsKey(jobId);
}
+ /**
+ * Get job.
+ *
+ * @param jobId job id
+ * @return job
+ */
+ public static PipelineJob getJob(final String jobId) {
+ return JOB_MAP.get(jobId);
+ }
+
/**
* Stop job.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index c6b67dd0720..116df621efb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -24,6 +24,9 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.NoUniqueKeyPosition;
@@ -36,10 +39,12 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
@@ -57,6 +62,8 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Inventory data task splitter.
@@ -80,16 +87,26 @@ public final class InventoryTaskSplitter {
public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) {
List<InventoryTask> result = new LinkedList<>();
long startTimeMillis = System.currentTimeMillis();
- PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
- for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
- result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, jobItemContext.getImporterConnector(), sourceDataSource, jobItemContext.getSourceMetaDataLoader(),
- jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(), jobItemContext.getJobProcessContext().getInventoryImporterExecuteEngine(), jobItemContext));
+ InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext();
+ for (InventoryDumperConfiguration each : splitInventoryDumperConfig(jobItemContext)) {
+ AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
+ PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
+ Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
+ Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext);
+ result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
+ processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
}
log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
return result;
}
- private Collection<InventoryDumperConfiguration> splitDumperConfig(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig) {
+ /**
+ * Split inventory dumper configuration.
+ *
+ * @param jobItemContext job item context
+ * @return inventory dumper configurations
+ */
+ public Collection<InventoryDumperConfiguration> splitInventoryDumperConfig(final InventoryIncrementalJobItemContext jobItemContext) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index e028e86a492..db0d0d8c3cb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -21,39 +21,21 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Incremental task.
*/
+@RequiredArgsConstructor
@Slf4j
-@ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
+@ToString(exclude = {"incrementalExecuteEngine", "dumper", "importers", "taskProgress"})
public final class IncrementalTask implements PipelineTask {
@Getter
@@ -61,8 +43,6 @@ public final class IncrementalTask implements PipelineTask {
private final ExecuteEngine incrementalExecuteEngine;
- private final PipelineChannel channel;
-
private final Dumper dumper;
private final Collection<Importer> importers;
@@ -70,57 +50,12 @@ public final class IncrementalTask implements PipelineTask {
@Getter
private final IncrementalTaskProgress taskProgress;
- // TODO simplify parameters
- public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
- final PipelineChannelCreator pipelineChannelCreator, final ImporterConnector importerConnector,
- final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalExecuteEngine,
- final InventoryIncrementalJobItemContext jobItemContext) {
- taskId = dumperConfig.getDataSourceName();
- this.incrementalExecuteEngine = incrementalExecuteEngine;
- IngestPosition position = dumperConfig.getPosition();
- taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
- channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
- dumper = PipelineTypedSPILoader.getDatabaseTypedService(
- IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
- importers = createImporters(concurrency, importerConfig, importerConnector, channel, jobItemContext);
- }
-
- private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final InventoryIncrementalJobItemProgress jobItemProgress) {
- IncrementalTaskProgress result = new IncrementalTaskProgress(position);
- if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
- Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
- .ifPresent(optional -> result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
- }
- return result;
- }
-
- private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener jobProgressListener) {
- Collection<Importer> result = new LinkedList<>();
- for (int i = 0; i < concurrency; i++) {
- result.add(TypedSPILoader.getService(ImporterCreator.class, importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener,
- ImporterType.INCREMENTAL));
- }
- return result;
- }
-
- private PipelineChannel createChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
- return pipelineChannelCreator.createPipelineChannel(concurrency, 5, records -> {
- Record lastHandledRecord = records.get(records.size() - 1);
- if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
- progress.setPosition(lastHandledRecord.getPosition());
- progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
- }
- progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
- });
- }
-
@Override
public Collection<CompletableFuture<?>> start() {
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
Collection<CompletableFuture<?>> result = new LinkedList<>();
- result.add(incrementalExecuteEngine.submit(dumper, new JobExecuteCallback(taskId, "incremental dumper")));
- importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new JobExecuteCallback(taskId, "importer"))));
+ result.add(incrementalExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+ importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new TaskExecuteCallback(this))));
return result;
}
@@ -134,25 +69,5 @@ public final class IncrementalTask implements PipelineTask {
@Override
public void close() {
- channel.close();
- }
-
- @RequiredArgsConstructor
- private class JobExecuteCallback implements ExecuteCallback {
-
- private final String taskId;
-
- private final String jobType;
-
- @Override
- public void onSuccess() {
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- log.error("{} on failure, task ID={}", jobType, taskId);
- IncrementalTask.this.stop();
- IncrementalTask.this.close();
- }
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index e5dc2fceb3b..5f10edf1712 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -20,15 +20,16 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.util.Collection;
@@ -43,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
@Getter
- private final PipelineJobItemContext jobItemContext;
+ private final InventoryIncrementalJobItemContext jobItemContext;
private final Collection<PipelineTask> inventoryTasks;
@@ -51,10 +52,10 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
private final PipelineJobAPI jobAPI;
- public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
+ public InventoryIncrementalTasksRunner(final InventoryIncrementalJobItemContext jobItemContext) {
this.jobItemContext = jobItemContext;
- this.inventoryTasks = inventoryTasks;
- this.incrementalTasks = incrementalTasks;
+ inventoryTasks = jobItemContext.getInventoryTasks();
+ incrementalTasks = jobItemContext.getIncrementalTasks();
jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName());
}
@@ -63,11 +64,11 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
jobItemContext.setStopping(true);
for (PipelineTask each : inventoryTasks) {
each.stop();
- each.close();
+ CloseUtils.closeQuietly(each);
}
for (PipelineTask each : incrementalTasks) {
each.stop();
- each.close();
+ CloseUtils.closeQuietly(each);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 4adad77d972..22a6471effb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -18,28 +18,15 @@
package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import javax.sql.DataSource;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
@@ -48,7 +35,8 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Inventory task.
*/
-@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "channel", "dumper", "importer"})
+@RequiredArgsConstructor
+@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "dumper", "importer"})
@Slf4j
public final class InventoryTask implements PipelineTask {
@@ -59,73 +47,20 @@ public final class InventoryTask implements PipelineTask {
private final ExecuteEngine inventoryImporterExecuteEngine;
- private final PipelineChannel channel;
-
private final Dumper dumper;
private final Importer importer;
private final AtomicReference<IngestPosition> position;
- public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
- final PipelineChannelCreator pipelineChannelCreator, final ImporterConnector importerConnector,
- final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
- final ExecuteEngine inventoryDumperExecuteEngine, final ExecuteEngine inventoryImporterExecuteEngine,
- final PipelineJobProgressListener jobProgressListener) {
- taskId = generateTaskId(inventoryDumperConfig);
- this.inventoryDumperExecuteEngine = inventoryDumperExecuteEngine;
- this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
- channel = createChannel(pipelineChannelCreator, importerConfig.getBatchSize());
- dumper = new InventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
- importer = TypedSPILoader.getService(ImporterCreator.class,
- importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
- position = new AtomicReference<>(inventoryDumperConfig.getPosition());
- }
-
- private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
- String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
- return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
- }
-
@Override
public Collection<CompletableFuture<?>> start() {
Collection<CompletableFuture<?>> result = new LinkedList<>();
- result.add(inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
-
- @Override
- public void onSuccess() {
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- log.error("dumper onFailure, taskId={}", taskId);
- stop();
- close();
- }
- }));
- result.add(inventoryImporterExecuteEngine.submit(importer, new ExecuteCallback() {
-
- @Override
- public void onSuccess() {
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- log.error("importer onFailure, taskId={}", taskId);
- stop();
- close();
- }
- }));
+ result.add(inventoryDumperExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+ result.add(inventoryImporterExecuteEngine.submit(importer, new TaskExecuteCallback(this)));
return result;
}
- private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator, final int batchSize) {
- return pipelineChannelCreator.createPipelineChannel(1, batchSize, records -> {
- Record lastRecord = records.get(records.size() - 1);
- position.set(lastRecord.getPosition());
- });
- }
-
@Override
public void stop() {
dumper.stop();
@@ -139,6 +74,5 @@ public final class InventoryTask implements PipelineTask {
@Override
public void close() {
- channel.close();
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 1b090ee9b2a..b0000fe97d6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,13 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
+import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
* Pipeline task interface.
*/
-public interface PipelineTask {
+public interface PipelineTask extends Closeable {
/**
* Start task.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
new file mode 100644
index 00000000000..15decabf3e0
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.task;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.AckCallbacks;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Pipeline task utilities.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineTaskUtils {
+
+ /**
+ * Generate inventory task id.
+ *
+ * @param inventoryDumperConfig inventory dumper configuration
+ * @return inventory task id
+ */
+ public static String generateInventoryTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
+ String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
+ return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
+ }
+
+ /**
+ * Create incremental task progress.
+ *
+ * @param position ingest position
+ * @param jobItemProgress job item progress
+ * @return incremental task progress
+ */
+ public static IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final InventoryIncrementalJobItemProgress jobItemProgress) {
+ IncrementalTaskProgress result = new IncrementalTaskProgress(position);
+ if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
+ Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
+ .ifPresent(optional -> result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
+ }
+ return result;
+ }
+
+ /**
+ * Create channel for inventory task.
+ *
+ * @param pipelineChannelCreator channel creator
+ * @param averageElementSize average element size
+ * @param position ingest position
+ * @return channel
+ */
+ public static PipelineChannel createInventoryChannel(final PipelineChannelCreator pipelineChannelCreator, final int averageElementSize, final AtomicReference<IngestPosition> position) {
+ return pipelineChannelCreator.createPipelineChannel(1, averageElementSize, records -> AckCallbacks.inventoryCallback(records, position));
+ }
+
+ /**
+ * Create incremental channel.
+ *
+ * @param concurrency output concurrency
+ * @param pipelineChannelCreator channel creator
+ * @param progress incremental task progress
+ * @return channel
+ */
+ public static PipelineChannel createIncrementalChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
+ return pipelineChannelCreator.createPipelineChannel(concurrency, 5, records -> AckCallbacks.incrementalCallback(records, progress));
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/connector/DataSourceImporterConnector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
similarity index 62%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/connector/DataSourceImporterConnector.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
index cc336523d4c..12d63575633 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/connector/DataSourceImporterConnector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
@@ -15,28 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.importer.connector;
+package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
/**
- * Default importer connector.
+ * Task execute callback.
*/
-
+@Slf4j
@RequiredArgsConstructor
-public final class DataSourceImporterConnector implements ImporterConnector {
+public final class TaskExecuteCallback implements ExecuteCallback {
- private final PipelineDataSourceManager pipelineDataSourceManager;
+ private final PipelineTask task;
@Override
- public Object getConnector() {
- return pipelineDataSourceManager;
+ public void onSuccess() {
}
@Override
- public String getType() {
- return "DataSource";
+ public void onFailure(final Throwable throwable) {
+ log.error("onFailure, task ID={}", task.getTaskId());
+ task.stop();
+ IOUtils.closeQuietly(task);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index facdd78e1ab..121c0594d98 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -19,7 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
/**
@@ -75,4 +78,21 @@ public final class PipelineJdbcUtils {
return false;
}
}
+
+ /**
+ * Cancel statement.
+ *
+ * @param statement statement
+ * @throws SQLWrapperException if cancelling statement failed
+ */
+ public static void cancelStatement(final Statement statement) throws SQLWrapperException {
+ try {
+ if (null == statement || statement.isClosed()) {
+ return;
+ }
+ statement.cancel();
+ } catch (final SQLException ex) {
+ throw new SQLWrapperException(ex);
+ }
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
deleted file mode 100644
index c0f5cce078c..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-
-/**
- * Importer creator.
- */
-@SingletonSPI
-public interface ImporterCreator extends TypedSPI {
-
- /**
- * Create importer.
- *
- * @param importerConfig importer configuration
- * @param importerConnector import connector
- * @param channel channel
- * @param jobProgressListener job progress listener
- * @param importerType importer type
- * @return importer
- */
- Importer createImporter(ImporterConfiguration importerConfig, ImporterConnector importerConnector, PipelineChannel channel, PipelineJobProgressListener jobProgressListener,
- ImporterType importerType);
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/sink/PipelineSink.java
similarity index 53%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/sink/PipelineSink.java
index 1b090ee9b2a..e3a703c8a7c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/sink/PipelineSink.java
@@ -15,46 +15,33 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.task;
+package org.apache.shardingsphere.data.pipeline.spi.importer.sink;
-import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
+import java.io.Closeable;
+import java.util.List;
/**
- * Pipeline task interface.
+ * Pipeline sink.
*/
-public interface PipelineTask {
+public interface PipelineSink extends Closeable {
/**
- * Start task.
+ * Identifier matched or not.
*
- * @return future
+ * @param identifier sink identifier
+ * @return true if matched, otherwise false
*/
- Collection<CompletableFuture<?>> start();
+ boolean identifierMatched(Object identifier);
/**
- * Stop task.
- */
- void stop();
-
- /**
- * Get task id.
- *
- * @return task id
- */
- String getTaskId();
-
- /**
- * Get task progress.
+ * Write data.
*
- * @return task progress
- */
- TaskProgress getTaskProgress();
-
- /**
- * Close.
+ * @param ackId ack id
+ * @param records records
+ * @return job progress updated parameter
*/
- void close();
+ PipelineJobProgressUpdatedParameter write(String ackId, List<Record> records);
}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
deleted file mode 100644
index 473c86a363d..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.importer.DataSourceImporterCreator
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index ff598d5142f..c87c5a773ba 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -67,8 +67,7 @@ public final class MigrationJob extends AbstractSimplePipelineJob {
@Override
protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
- InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
- return new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+ return new InventoryIncrementalTasksRunner((InventoryIncrementalJobItemContext) pipelineJobItemContext);
}
@Override
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 2643b37ddd4..e00f527cfbb 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -29,13 +29,13 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import java.util.Collection;
import java.util.LinkedList;
@@ -124,10 +124,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
return sourceMetaDataLoaderLazyInitializer.get();
}
- // TODO Use SPI, configurable
@Override
- public ImporterConnector getImporterConnector() {
- return new DataSourceImporterConnector(dataSourceManager);
+ public PipelineSink getSink() {
+ return new PipelineDataSourceSink(taskConfig.getImporterConfig(), dataSourceManager);
}
/**
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index f34fe078082..be3ff4883b6 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -19,19 +19,27 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.prepare;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
@@ -39,11 +47,15 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerU
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.lock.GlobalLockNames;
@@ -56,8 +68,11 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
/**
* Migration job preparer.
@@ -86,6 +101,8 @@ public final class MigrationJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
+ prepareIncremental(jobItemContext);
+ initInventoryTasks(jobItemContext);
if (PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType())) {
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
@@ -93,7 +110,6 @@ public final class MigrationJobPreparer {
return;
}
}
- initInventoryTasks(jobItemContext);
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
@@ -133,8 +149,7 @@ public final class MigrationJobPreparer {
}
InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
if (null == initProgress) {
- PipelineDataSourceWrapper targetDataSource = ((PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector())
- .getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
+ PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
PipelineJobPreparerUtils.checkTargetDataSource(jobItemContext.getJobConfig().getTargetDatabaseType(), jobItemContext.getTaskConfig().getImporterConfig(),
Collections.singletonList(targetDataSource));
}
@@ -144,7 +159,7 @@ public final class MigrationJobPreparer {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String targetDatabaseType = jobConfig.getTargetDatabaseType();
CreateTableConfiguration createTableConfig = jobItemContext.getTaskConfig().getCreateTableConfig();
- PipelineDataSourceManager dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
+ PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
PrepareTargetSchemasParameter prepareTargetSchemasParam = new PrepareTargetSchemasParameter(
PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParam);
@@ -153,6 +168,16 @@ public final class MigrationJobPreparer {
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
}
+ private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
+ MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+ JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
+ try {
+ taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), jobItemContext.getDataSourceManager()));
+ } catch (final SQLException ex) {
+ throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
+ }
+ }
+
private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig());
@@ -161,21 +186,30 @@ public final class MigrationJobPreparer {
private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
- PipelineDataSourceManager dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
- JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
- try {
- taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), dataSourceManager));
- } catch (final SQLException ex) {
- throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
- }
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
+ DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+ ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
- IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
- pipelineChannelCreator, jobItemContext.getImporterConnector(), sourceMetaDataLoader, incrementalExecuteEngine, jobItemContext);
+ IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress());
+ PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), pipelineChannelCreator, taskProgress);
+ Dumper dumper = PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+ .createIncrementalDumper(dumperConfig, dumperConfig.getPosition(), channel, sourceMetaDataLoader);
+ Collection<Importer> importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext);
+ PipelineTask incrementalTask = new IncrementalTask(dumperConfig.getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}
+ private Collection<Importer> createImporters(final ImporterConfiguration importerConfig, final PipelineSink sink, final PipelineChannel channel,
+ final PipelineJobProgressListener jobProgressListener) {
+ Collection<Importer> result = new LinkedList<>();
+ for (int i = 0; i < importerConfig.getConcurrency(); i++) {
+ Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, sink, jobProgressListener);
+ result.add(importer);
+ }
+ return result;
+ }
+
/**
* Do cleanup work.
*
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index bde28c399e8..92f80e1bacf 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -205,7 +205,8 @@ public final class PipelineContainerComposer implements AutoCloseable {
return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
}
if (DatabaseTypeUtils.isPostgreSQL(databaseType) || DatabaseTypeUtils.isOpenGauss(databaseType)) {
- return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified"), new Property("bitToString", Boolean.TRUE.toString())));
+ return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified"),
+ new Property("bitToString", Boolean.TRUE.toString()), new Property("TimeZone", "UTC")));
}
return jdbcUrl;
}
@@ -383,8 +384,10 @@ public final class PipelineContainerComposer implements AutoCloseable {
try (Connection connection = proxyDataSource.getConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
return transformResultSetToList(resultSet);
- } catch (final SQLException ex) {
- log.error("Data access error: ", ex);
+ // CHECKSTYLE:OFF
+ } catch (final SQLException | RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("Data access error, sql: {}.", sql, ex);
}
Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
retryNumber++;
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 98393d6cdfe..b3cfafdfbdc 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -183,7 +183,7 @@ class CDCE2EIT {
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
CompletableFuture.runAsync(() -> new CDCClient(parameter, recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
if (null != throwable) {
- log.error("cdc client sync failed, ", throwable);
+ log.error("cdc client sync failed", throwable);
}
});
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index f9d837f20e7..bc646949b34 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -66,6 +66,7 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
@Override
public void accept(final List<Record> records) {
+ log.info("Records count: {}", records.size());
log.debug("Records: {}", records);
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index e1b6c31d9d9..155e9cc3e0a 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -21,22 +21,18 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -153,13 +149,7 @@ class GovernanceRepositoryAPIImplTest {
dumperConfig.setLogicTableName("t_order");
dumperConfig.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
dumperConfig.setShardingItem(0);
- PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
- PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
- return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtils.getPipelineChannelCreator(), mockImporterConnector(),
- dataSource, metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
- }
-
- private ImporterConnector mockImporterConnector() {
- return new DataSourceImporterConnector(new DefaultPipelineDataSourceManager());
+ return new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(dumperConfig), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(),
+ mock(Dumper.class), mock(Importer.class), new AtomicReference<>(new PlaceholderPosition()));
}
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java
index c0c6a27ed82..23fb4908f97 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java
@@ -21,11 +21,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
public final class FixtureImporter implements Importer {
- public FixtureImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
+ public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineSink pipelineSink, final PipelineChannel channel,
final PipelineJobProgressListener jobProgressListener) {
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
deleted file mode 100644
index 8eb83e501f8..00000000000
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-
-/**
- * Fixture importer creator.
- */
-public final class FixtureImporterCreator implements ImporterCreator {
-
- @Override
- public Importer createImporter(final ImporterConfiguration importerConfig,
- final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
- return new FixtureImporter(importerConfig, importerConnector, channel, jobProgressListener);
- }
-
- @Override
- public String getType() {
- return "FIXTURE";
- }
-}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
index e344c3123e9..8cd699c3f11 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import java.util.Collection;
@@ -61,7 +61,7 @@ public final class FixtureInventoryIncrementalJobItemContext implements Inventor
}
@Override
- public ImporterConnector getImporterConnector() {
+ public PipelineSink getSink() {
return null;
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
similarity index 86%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index a02398e7e49..07ad96e23ea 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -31,9 +31,9 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.importer.DataSourceImporter;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -50,15 +50,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class DataSourceImporterTest {
+class PipelineDataSourceSinkTest {
private static final String TABLE_NAME = "test_table";
@@ -80,12 +82,12 @@ class DataSourceImporterTest {
@Mock
private PreparedStatement preparedStatement;
- private DataSourceImporter jdbcImporter;
+ private SingleChannelConsumerImporter importer;
@BeforeEach
void setUp() throws SQLException {
- ImporterConnector importerConnector = new DataSourceImporterConnector(dataSourceManager);
- jdbcImporter = new DataSourceImporter(mockImporterConfiguration(), importerConnector, channel, new FixtureInventoryIncrementalJobItemContext());
+ PipelineSink pipelineSink = new PipelineDataSourceSink(mockImporterConfiguration(), dataSourceManager);
+ importer = new SingleChannelConsumerImporter(channel, 100, 1, TimeUnit.SECONDS, pipelineSink, new FixtureInventoryIncrementalJobItemContext());
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
when(dataSource.getConnection()).thenReturn(connection);
}
@@ -94,8 +96,8 @@ class DataSourceImporterTest {
void assertWriteInsertDataRecord() throws SQLException {
DataRecord insertRecord = getDataRecord("INSERT");
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(insertRecord));
- jdbcImporter.run();
+ when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(insertRecord));
+ importer.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
verify(preparedStatement).setObject(3, "INSERT");
@@ -106,9 +108,9 @@ class DataSourceImporterTest {
void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord = getDataRecord("DELETE");
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(deleteRecord));
+ when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(deleteRecord));
when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
- jdbcImporter.run();
+ importer.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
verify(preparedStatement).addBatch();
@@ -118,8 +120,8 @@ class DataSourceImporterTest {
void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord = getDataRecord("UPDATE");
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(updateRecord));
- jdbcImporter.run();
+ when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(updateRecord));
+ importer.run();
verify(preparedStatement).setObject(1, 20);
verify(preparedStatement).setObject(2, "UPDATE");
verify(preparedStatement).setObject(3, 1);
@@ -131,8 +133,8 @@ class DataSourceImporterTest {
void assertUpdatePrimaryKeyDataRecord() throws SQLException {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(updateRecord));
- jdbcImporter.run();
+ when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(updateRecord));
+ importer.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);
inOrder.verify(preparedStatement).setObject(2, 10);
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index be205ea76ca..a6e76f8d71e 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -75,7 +75,7 @@ class InventoryTaskSplitterTest {
private void initJobItemContext() {
MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
- dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
+ dataSourceManager = jobItemContext.getDataSourceManager();
}
@AfterEach
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
index 49d84f74c7a..f129614ffaf 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,14 +17,12 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.task;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureImporterConnector;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.AfterEach;
@@ -33,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -56,10 +55,8 @@ class IncrementalTaskTest {
void setUp() {
MigrationTaskConfiguration taskConfig = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
- PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
- incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
- PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), metaDataLoader, PipelineContextUtils.getExecuteEngine(),
- new FixtureInventoryIncrementalJobItemContext());
+ incrementalTask = new IncrementalTask("ds_0", PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
+ Collections.singletonList(mock(Importer.class)), new IncrementalTaskProgress(new PlaceholderPosition()));
}
@AfterEach
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index 8aeb66607a5..5c98f92a38d 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -21,14 +21,18 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureImporterConnector;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.AfterAll;
@@ -44,10 +48,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
class InventoryTaskTest {
@@ -73,13 +79,14 @@ class InventoryTaskTest {
@Test
void assertStartWithGetEstimatedRowsFailure() {
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
+ AtomicReference<IngestPosition> position = new AtomicReference<>(inventoryDumperConfig.getPosition());
+ PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(PipelineContextUtils.getPipelineChannelCreator(), 100, position);
PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
- InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
- PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
- metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+ InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
+ PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(),
+ new InventoryDumper(inventoryDumperConfig, channel, dataSource, metaDataLoader), mock(Importer.class), position);
assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
- inventoryTask.close();
}
@Test
@@ -87,11 +94,9 @@ class InventoryTaskTest {
initTableData(taskConfig.getDumperConfig());
// TODO use t_order_0, and also others
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order");
- PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
- PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
- InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
- PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
- metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+ AtomicReference<IngestPosition> position = new AtomicReference<>(inventoryDumperConfig.getPosition());
+ InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
+ PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position);
CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
inventoryTask.close();
diff --git a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
deleted file mode 100644
index 2d3a03a50cc..00000000000
--- a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureImporterCreator