You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/17 09:51:57 UTC
[shardingsphere] branch master updated: Add CDC sink config and optimize related name of CDC importer (#24213)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4007c4fb194 Add CDC sink config and optimize related name of CDC importer (#24213)
4007c4fb194 is described below
commit 4007c4fb194bd0819870d46bc63fc3036c9802a3
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Feb 17 17:51:40 2023 +0800
Add CDC sink config and optimize related name of CDC importer (#24213)
* Rename CDC importer to socket sink importer
* Add CDC sink config
* Fix ci
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 13 ++++-
.../cdc/config/job/CDCJobConfiguration.java | 13 +++++
.../data/pipeline/cdc/constant/CDCSinkType.java | 25 ++++++++++
.../data/pipeline/cdc/core/ack/CDCAckHolder.java | 20 ++++----
.../SocketSinkImporterConnector.java} | 55 +++++++++++-----------
.../{CDCImporter.java => SocketSinkImporter.java} | 14 +++---
...Creator.java => SocketSinkImporterCreator.java} | 9 ++--
.../data/pipeline/cdc/util/CDCDataRecordUtil.java | 29 ++++++------
.../cdc/yaml/job/YamlCDCJobConfiguration.java | 15 ++++++
.../yaml/job/YamlCDCJobConfigurationSwapper.java | 15 +++++-
...here.data.pipeline.spi.importer.ImporterCreator | 2 +-
.../pipeline/cdc/core/ack/CDCAckHolderTest.java | 32 ++++++-------
...est.java => SocketSinkImporterCreatorTest.java} | 8 ++--
.../pipeline/cdc/util/CDCDataRecordUtilTest.java | 14 +++---
.../job/YamlCDCJobConfigurationSwapperTest.java | 5 ++
.../backend/handler/cdc/CDCBackendHandler.java | 8 ++--
16 files changed, 181 insertions(+), 96 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 30ad2964e33..4a3a3495cd7 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -48,10 +48,12 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -87,6 +89,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -106,14 +109,20 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
* Create CDC job config.
*
* @param param create CDC job param
+ * @param sinkType sink type
+ * @param sinkProps sink properties
* @return job id
*/
- public String createJob(final StreamDataParameter param) {
+ public String createJob(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
yamlJobConfig.setDatabase(param.getDatabase());
yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
yamlJobConfig.setFull(param.isFull());
yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
+ YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
+ sinkConfig.setSinkType(sinkType.name());
+ sinkConfig.setProps(sinkProps);
+ yamlJobConfig.setSinkConfig(sinkConfig);
ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtil.convertDataNodesToLines(param.getDataNodesMap());
@@ -127,7 +136,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
if (repositoryAPI.isExisted(jobConfigKey)) {
- log.warn("cdc job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
+ log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
return jobConfig.getJobId();
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClassName());
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 7a04f3d7bca..45a639f7fda 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -22,8 +22,10 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import java.util.List;
+import java.util.Properties;
/**
* CDC job configuration.
@@ -50,6 +52,8 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
private final boolean decodeWithTX;
+ private final SinkConfiguration sinkConfig;
+
private final int concurrency;
private final int retryTimes;
@@ -58,4 +62,13 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
public int getJobShardingCount() {
return jobShardingDataNodes.size();
}
+
+ @RequiredArgsConstructor
+ @Getter
+ public static class SinkConfiguration {
+
+ private final CDCSinkType sinkType;
+
+ private final Properties props;
+ }
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCSinkType.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCSinkType.java
new file mode 100644
index 00000000000..96ce4f08f5e
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCSinkType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC sink type.
+ */
+public enum CDCSinkType {
+ SOCKET
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
index 134027c25c5..7d5e07fea05 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import java.util.Map;
import java.util.UUID;
@@ -33,7 +33,7 @@ public final class CDCAckHolder {
private static final CDCAckHolder INSTANCE = new CDCAckHolder();
- private final Map<String, Map<CDCImporter, CDCAckPosition>> ackIdImporterMap = new ConcurrentHashMap<>();
+ private final Map<String, Map<SocketSinkImporter, CDCAckPosition>> ackIdPositionMap = new ConcurrentHashMap<>();
/**
* the ack of CDC.
@@ -41,9 +41,9 @@ public final class CDCAckHolder {
* @param ackId ack id
*/
public void ack(final String ackId) {
- Map<CDCImporter, CDCAckPosition> importerDataRecordMap = ackIdImporterMap.remove(ackId);
+ Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = ackIdPositionMap.remove(ackId);
if (null != importerDataRecordMap) {
- importerDataRecordMap.forEach(CDCImporter::ackWithLastDataRecord);
+ importerDataRecordMap.forEach(SocketSinkImporter::ackWithLastDataRecord);
}
}
@@ -53,10 +53,10 @@ public final class CDCAckHolder {
* @param importerDataRecordMap import data record map
* @return ack id
*/
- public String bindAckIdWithPosition(final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
+ public String bindAckIdWithPosition(final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
String result = generateAckId();
// TODO it's might need to persist to registry center in cluster mode.
- ackIdImporterMap.put(result, importerDataRecordMap);
+ ackIdPositionMap.put(result, importerDataRecordMap);
return result;
}
@@ -67,13 +67,13 @@ public final class CDCAckHolder {
/**
* Clean up.
*
- * @param cdcImporter CDC importer
+ * @param socketSinkImporter CDC importer
*/
- public void cleanUp(final CDCImporter cdcImporter) {
- if (ackIdImporterMap.isEmpty()) {
+ public void cleanUp(final SocketSinkImporter socketSinkImporter) {
+ if (ackIdPositionMap.isEmpty()) {
return;
}
- ackIdImporterMap.entrySet().removeIf(entry -> entry.getValue().containsKey(cdcImporter));
+ ackIdPositionMap.entrySet().removeIf(entry -> entry.getValue().containsKey(socketSinkImporter));
}
/**
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
similarity index 78%
rename from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 77ccf10446c..96192c06a75 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
+package org.apache.shardingsphere.data.pipeline.cdc.core.connector;
import io.netty.channel.Channel;
import lombok.Getter;
@@ -26,9 +26,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtil;
@@ -54,10 +55,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
- * CDC importer connector.
+ * Socket sink importer connector.
*/
@Slf4j
-public final class CDCImporterConnector implements ImporterConnector {
+public final class SocketSinkImporterConnector implements ImporterConnector {
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
@@ -79,14 +80,14 @@ public final class CDCImporterConnector implements ImporterConnector {
private final Map<String, String> tableNameSchemaMap = new HashMap<>();
- private final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap = new ConcurrentHashMap<>();
+ private final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap = new ConcurrentHashMap<>();
private final AtomicInteger runningIncrementalTaskCount = new AtomicInteger(0);
private Thread incrementalImporterTask;
- public CDCImporterConnector(final Channel channel, final String database, final int jobShardingCount, final Collection<String> schemaTableNames,
- final Comparator<DataRecord> dataRecordComparator) {
+ public SocketSinkImporterConnector(final Channel channel, final String database, final int jobShardingCount, final Collection<String> schemaTableNames,
+ final Comparator<DataRecord> dataRecordComparator) {
this.channel = channel;
this.database = database;
this.jobShardingCount = jobShardingCount;
@@ -106,29 +107,29 @@ public final class CDCImporterConnector implements ImporterConnector {
* Write data record into channel.
*
* @param recordList data records
- * @param cdcImporter cdc importer
+ * @param socketSinkImporter cdc importer
* @param importerType importer type
*/
- public void write(final List<Record> recordList, final CDCImporter cdcImporter, final ImporterType importerType) {
+ public void write(final List<Record> recordList, final SocketSinkImporter socketSinkImporter, final ImporterType importerType) {
if (recordList.isEmpty()) {
return;
}
if (ImporterType.INVENTORY == importerType || null == dataRecordComparator) {
- Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
+ Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
int dataRecordCount = (int) recordList.stream().filter(each -> each instanceof DataRecord).count();
Record lastRecord = recordList.get(recordList.size() - 1);
if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
- cdcImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
+ socketSinkImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
return;
}
- importerDataRecordMap.put(cdcImporter, new CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
+ importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
writeImmediately(recordList, importerDataRecordMap);
} else if (ImporterType.INCREMENTAL == importerType) {
- writeIntoQueue(recordList, cdcImporter);
+ writeIntoQueue(recordList, socketSinkImporter);
}
}
- private void writeImmediately(final List<? extends Record> recordList, final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
+ private void writeImmediately(final List<? extends Record> recordList, final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
while (!channel.isWritable() && channel.isActive()) {
doAwait();
}
@@ -159,8 +160,8 @@ public final class CDCImporterConnector implements ImporterConnector {
}
@SneakyThrows(InterruptedException.class)
- private void writeIntoQueue(final List<Record> dataRecords, final CDCImporter cdcImporter) {
- BlockingQueue<Record> blockingQueue = incrementalRecordMap.get(cdcImporter);
+ private void writeIntoQueue(final List<Record> dataRecords, final SocketSinkImporter socketSinkImporter) {
+ BlockingQueue<Record> blockingQueue = incrementalRecordMap.get(socketSinkImporter);
if (null == blockingQueue) {
log.warn("not find the queue to write");
return;
@@ -171,13 +172,13 @@ public final class CDCImporterConnector implements ImporterConnector {
}
/**
- * Send finished record event.
+ * Send incremental start event.
*
- * @param cdcImporter cdc importer
+ * @param socketSinkImporter socket sink importer
* @param batchSize batch size
*/
- public void sendIncrementalStartEvent(final CDCImporter cdcImporter, final int batchSize) {
- incrementalRecordMap.computeIfAbsent(cdcImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
+ public void sendIncrementalStartEvent(final SocketSinkImporter socketSinkImporter, final int batchSize) {
+ incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
int count = runningIncrementalTaskCount.incrementAndGet();
if (count < jobShardingCount || null == dataRecordComparator) {
return;
@@ -190,20 +191,20 @@ public final class CDCImporterConnector implements ImporterConnector {
}
/**
- * Clean CDC importer connector.
+ * Clean socket sink importer connector.
*
- * @param cdcImporter CDC importer
+ * @param socketSinkImporter CDC importer
*/
- public void clean(final CDCImporter cdcImporter) {
- incrementalRecordMap.remove(cdcImporter);
- if (ImporterType.INCREMENTAL == cdcImporter.getImporterType()) {
+ public void clean(final SocketSinkImporter socketSinkImporter) {
+ incrementalRecordMap.remove(socketSinkImporter);
+ if (ImporterType.INCREMENTAL == socketSinkImporter.getImporterType()) {
incrementalTaskRunning = false;
}
}
@Override
public String getType() {
- return "CDC";
+ return CDCSinkType.SOCKET.name();
}
@RequiredArgsConstructor
@@ -214,7 +215,7 @@ public final class CDCImporterConnector implements ImporterConnector {
@Override
public void run() {
while (incrementalTaskRunning) {
- Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
+ Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
List<DataRecord> dataRecords = new LinkedList<>();
for (int i = 0; i < batchSize; i++) {
DataRecord minimumDataRecord = CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(incrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
similarity index 88%
rename from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
index 9b066b6dee8..b267591a84d 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
@@ -32,7 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.Pipelin
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -43,17 +43,17 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * CDC importer.
+ * Socket sink importer.
*/
@Slf4j
-public final class CDCImporter extends AbstractLifecycleExecutor implements Importer {
+public final class SocketSinkImporter extends AbstractLifecycleExecutor implements Importer {
@Getter(AccessLevel.PROTECTED)
private final ImporterConfiguration importerConfig;
private final PipelineChannel channel;
- private final CDCImporterConnector importerConnector;
+ private final SocketSinkImporterConnector importerConnector;
private final PipelineJobProgressListener jobProgressListener;
@@ -62,12 +62,12 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- public CDCImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel, final PipelineJobProgressListener jobProgressListener,
- final ImporterType importerType) {
+ public SocketSinkImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
+ final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
this.importerConfig = importerConfig;
rateLimitAlgorithm = null == importerConfig ? null : importerConfig.getRateLimitAlgorithm();
this.channel = channel;
- this.importerConnector = (CDCImporterConnector) importerConnector;
+ this.importerConnector = (SocketSinkImporterConnector) importerConnector;
this.jobProgressListener = jobProgressListener;
this.importerType = importerType;
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
similarity index 84%
rename from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
index 9f1034ef0eb..0bbefdd059c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
@@ -21,23 +21,24 @@ import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
/**
- * CDC importer creator.
+ * Socket sink importer creator.
*/
-public final class CDCImporterCreator implements ImporterCreator {
+public final class SocketSinkImporterCreator implements ImporterCreator {
@Override
public Importer createImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
- return new CDCImporter(importerConfig, importerConnector, channel, jobProgressListener, importerType);
+ return new SocketSinkImporter(importerConfig, importerConnector, channel, jobProgressListener, importerType);
}
@Override
public String getType() {
- return "CDC";
+ return CDCSinkType.SOCKET.name();
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
index 295d03cabfe..bd3edd8cb10 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.util;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import java.util.Comparator;
import java.util.HashMap;
@@ -41,8 +41,8 @@ public final class CDCDataRecordUtil {
* @param cdcAckPositionMap CDC ack position map.
* @return minimum data record
*/
- public static DataRecord findMinimumDataRecordAndSavePosition(final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final Comparator<DataRecord> dataRecordComparator,
- final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
+ public static DataRecord findMinimumDataRecordAndSavePosition(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap, final Comparator<DataRecord> dataRecordComparator,
+ final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
if (null == dataRecordComparator) {
return findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
} else {
@@ -50,8 +50,9 @@ public final class CDCDataRecordUtil {
}
}
- private static DataRecord findMinimumDataRecordWithoutComparator(final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
- for (Entry<CDCImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
+ private static DataRecord findMinimumDataRecordWithoutComparator(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
+ final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
+ for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
Record record = entry.getValue().poll();
if (!(record instanceof DataRecord)) {
continue;
@@ -62,20 +63,20 @@ public final class CDCDataRecordUtil {
return null;
}
- private static void saveAckPosition(final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap, final CDCImporter cdcImporter, final Record record) {
- CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(cdcImporter);
+ private static void saveAckPosition(final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final SocketSinkImporter socketSinkImporter, final Record record) {
+ CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(socketSinkImporter);
if (null == cdcAckPosition) {
- cdcAckPositionMap.put(cdcImporter, new CDCAckPosition(record, 1));
+ cdcAckPositionMap.put(socketSinkImporter, new CDCAckPosition(record, 1));
} else {
cdcAckPosition.setLastRecord(record);
cdcAckPosition.setDataRecordCount(cdcAckPosition.getDataRecordCount());
}
}
- private static DataRecord findMinimumDataRecordWithComparator(final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap,
- final Comparator<DataRecord> dataRecordComparator) {
- Map<CDCImporter, DataRecord> waitSortedMap = new HashMap<>();
- for (Entry<CDCImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
+ private static DataRecord findMinimumDataRecordWithComparator(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
+ final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final Comparator<DataRecord> dataRecordComparator) {
+ Map<SocketSinkImporter, DataRecord> waitSortedMap = new HashMap<>();
+ for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
Record peek = entry.getValue().peek();
if (null == peek) {
continue;
@@ -88,8 +89,8 @@ public final class CDCDataRecordUtil {
return null;
}
DataRecord minRecord = null;
- CDCImporter belongImporter = null;
- for (Entry<CDCImporter, DataRecord> entry : waitSortedMap.entrySet()) {
+ SocketSinkImporter belongImporter = null;
+ for (Entry<SocketSinkImporter, DataRecord> entry : waitSortedMap.entrySet()) {
if (null == minRecord) {
minRecord = entry.getValue();
belongImporter = entry.getKey();
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 2d75a23931c..c0c18380d7e 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJ
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import java.util.List;
+import java.util.Properties;
/**
* CDC job configuration for YAML.
@@ -49,6 +50,8 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
private boolean decodeWithTX;
+ private YamlSinkConfiguration sinkConfig;
+
private int concurrency = 1;
private int retryTimes;
@@ -57,4 +60,16 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
public String getTargetDatabaseName() {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Sink configuration for YAML.
+ */
+ @Getter
+ @Setter
+ public static class YamlSinkConfiguration {
+
+ private String sinkType;
+
+ private Properties props;
+ }
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 8555a1e765c..2ec6780748c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -21,6 +21,9 @@ import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
@@ -48,20 +51,30 @@ public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSw
List<String> jobShardingDataNodes = null == data.getJobShardingDataNodes() ? null : data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList());
result.setJobShardingDataNodes(jobShardingDataNodes);
result.setDecodeWithTX(data.isDecodeWithTX());
+ result.setSinkConfig(swapToYamlSinkConfiguration(data.getSinkConfig()));
result.setConcurrency(data.getConcurrency());
result.setRetryTimes(0);
return result;
}
+ private YamlSinkConfiguration swapToYamlSinkConfiguration(final SinkConfiguration sinkConfig) {
+ YamlSinkConfiguration result = new YamlSinkConfiguration();
+ result.setSinkType(sinkConfig.getSinkType().name());
+ result.setProps(sinkConfig.getProps());
+ return result;
+ }
+
@Override
public CDCJobConfiguration swapToObject(final YamlCDCJobConfiguration yamlConfig) {
List<JobDataNodeLine> jobShardingDataNodes = null == yamlConfig.getJobShardingDataNodes()
? Collections.emptyList()
: yamlConfig.getJobShardingDataNodes().stream().map(JobDataNodeLine::unmarshal).collect(Collectors.toList());
+ YamlSinkConfiguration yamlSinkConfig = yamlConfig.getSinkConfig();
+ SinkConfiguration sinkConfig = new SinkConfiguration(CDCSinkType.valueOf(yamlSinkConfig.getSinkType()), yamlSinkConfig.getProps());
JobDataNodeLine tablesFirstDataNodes = null == yamlConfig.getTablesFirstDataNodes() ? null : JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
return new CDCJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabase(), yamlConfig.getSchemaTableNames(), yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
(ShardingSpherePipelineDataSourceConfiguration) dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), tablesFirstDataNodes,
- jobShardingDataNodes, yamlConfig.isDecodeWithTX(), yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
+ jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
/**
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
index cb3296d6a69..8b03ec4f243 100644
--- a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterCreator
+org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporterCreator
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
index bc15f22efe3..7ab1b895e3f 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
import org.junit.Test;
@@ -37,28 +37,28 @@ public final class CDCAckHolderTest {
@Test
public void assertBindAckIdWithPositionAndAck() {
CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
- final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
- CDCImporter cdcImporter = mock(CDCImporter.class);
- importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0));
- Optional<Map<String, Map<CDCImporter, CDCAckPosition>>> ackIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdImporterMap");
- assertTrue(ackIdImporterMap.isPresent());
- assertTrue(ackIdImporterMap.get().isEmpty());
+ final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
+ SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
+ importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0));
+ Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdPositionMap");
+ assertTrue(ackIdPositionMap.isPresent());
+ assertTrue(ackIdPositionMap.get().isEmpty());
String ackId = cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
- assertThat(ackIdImporterMap.get().size(), is(1));
+ assertThat(ackIdPositionMap.get().size(), is(1));
cdcAckHolder.ack(ackId);
- assertTrue(ackIdImporterMap.get().isEmpty());
+ assertTrue(ackIdPositionMap.get().isEmpty());
}
@Test
public void assertCleanUpTimeoutAckId() {
CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
- final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
- CDCImporter cdcImporter = mock(CDCImporter.class);
- importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 * 1000 * 10));
+ final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
+ SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
+ importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 * 1000 * 10));
cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
- cdcAckHolder.cleanUp(cdcImporter);
- Optional<Map<String, Map<CDCImporter, CDCAckPosition>>> actualAckIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdImporterMap");
- assertTrue(actualAckIdImporterMap.isPresent());
- assertTrue(actualAckIdImporterMap.get().isEmpty());
+ cdcAckHolder.cleanUp(socketSinkImporter);
+ Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdPositionMap");
+ assertTrue(ackIdPositionMap.isPresent());
+ assertTrue(ackIdPositionMap.get().isEmpty());
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
similarity index 79%
rename from kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
rename to kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
index 13b2708399b..6662af13712 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
import io.netty.channel.Channel;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.junit.Test;
@@ -34,14 +34,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
-public final class CDCImporterCreatorTest {
+public final class SocketSinkImporterCreatorTest {
@Mock
private ImporterConfiguration importerConfig;
@Test
public void assertCreateCDCImporter() {
- CDCImporterConnector importerConnector = new CDCImporterConnector(mock(Channel.class), "test", 1, Collections.emptyList(), null);
- assertThat(TypedSPILoader.getService(ImporterCreator.class, "CDC").createImporter(importerConfig, importerConnector, null, null, null), instanceOf(CDCImporter.class));
+ SocketSinkImporterConnector importerConnector = new SocketSinkImporterConnector(mock(Channel.class), "test", 1, Collections.emptyList(), null);
+ assertThat(TypedSPILoader.getService(ImporterCreator.class, "Socket").createImporter(importerConfig, importerConnector, null, null, null), instanceOf(SocketSinkImporter.class));
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
index 658d8fcdfc3..17e0bd5a33f 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPo
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.junit.Test;
@@ -41,21 +41,21 @@ public final class CDCDataRecordUtilTest {
@Test
public void assertFindMinimumDataRecordAndSavePosition() throws InterruptedException {
- final Map<CDCImporter, BlockingQueue<Record>> actualIncrementalRecordMap = new HashMap<>();
+ final Map<SocketSinkImporter, BlockingQueue<Record>> actualIncrementalRecordMap = new HashMap<>();
ArrayBlockingQueue<Record> queueFirst = new ArrayBlockingQueue<>(5);
queueFirst.put(generateDataRecord(0));
queueFirst.put(generateDataRecord(2));
queueFirst.put(generateDataRecord(4));
- CDCImporter mockCdcImporterFirst = mock(CDCImporter.class);
- actualIncrementalRecordMap.put(mockCdcImporterFirst, queueFirst);
+ SocketSinkImporter mockSocketSinkImporterFirst = mock(SocketSinkImporter.class);
+ actualIncrementalRecordMap.put(mockSocketSinkImporterFirst, queueFirst);
ArrayBlockingQueue<Record> queueSecond = new ArrayBlockingQueue<>(5);
queueSecond.put(generateDataRecord(1));
queueSecond.put(generateDataRecord(3));
queueSecond.put(generateDataRecord(5));
- CDCImporter mockCdcImporterSecond = mock(CDCImporter.class);
- actualIncrementalRecordMap.put(mockCdcImporterSecond, queueSecond);
+ SocketSinkImporter mockSocketSinkImporterSecond = mock(SocketSinkImporter.class);
+ actualIncrementalRecordMap.put(mockSocketSinkImporterSecond, queueSecond);
Comparator<DataRecord> dataRecordComparator = DataRecordComparatorGenerator.generatorIncrementalComparator(new OpenGaussDatabaseType());
- final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
+ final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
for (long i = 0; i <= 5; i++) {
DataRecord minimumDataRecord = CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
assertThat(minimumDataRecord.getCsn(), is(i));
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index c127a9e8798..5a62e46a04a 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.junit.Test;
import java.util.Arrays;
@@ -35,6 +37,9 @@ public final class YamlCDCJobConfigurationSwapperTest {
yamlJobConfig.setDatabase("test_db");
yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", "t_order_item"));
yamlJobConfig.setFull(true);
+ YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
+ sinkConfig.setSinkType(CDCSinkType.SOCKET.name());
+ yamlJobConfig.setSinkConfig(sinkConfig);
CDCJobConfiguration actual = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
assertThat(actual.getJobId(), is("j51017f973ac82cb1edea4f5238a258c25e89"));
assertThat(actual.getDatabase(), is("test_db"));
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index dbbdb7f60dc..ddfe27888ba 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -25,9 +25,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
@@ -62,6 +63,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -112,7 +114,7 @@ public final class CDCBackendHandler {
}
boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
- String jobId = jobAPI.createJob(parameter);
+ String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
connectionContext.setJobId(jobId);
startStreaming(requestId, jobId, connectionContext, channel);
return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
@@ -206,7 +208,7 @@ public final class CDCBackendHandler {
Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
- CDCJob job = new CDCJob(new CDCImporterConnector(channel, cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+ CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);