You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/17 09:51:57 UTC

[shardingsphere] branch master updated: Add CDC sink config and optimize related name of CDC importer (#24213)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4007c4fb194 Add CDC sink config and optimize related name of CDC importer (#24213)
4007c4fb194 is described below

commit 4007c4fb194bd0819870d46bc63fc3036c9802a3
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Feb 17 17:51:40 2023 +0800

    Add CDC sink config and optimize related name of CDC importer (#24213)
    
    * Rename CDC importer to socket sink importer
    
    * Add CDC sink config
    
    * Fix ci
---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 13 ++++-
 .../cdc/config/job/CDCJobConfiguration.java        | 13 +++++
 .../data/pipeline/cdc/constant/CDCSinkType.java    | 25 ++++++++++
 .../data/pipeline/cdc/core/ack/CDCAckHolder.java   | 20 ++++----
 .../SocketSinkImporterConnector.java}              | 55 +++++++++++-----------
 .../{CDCImporter.java => SocketSinkImporter.java}  | 14 +++---
 ...Creator.java => SocketSinkImporterCreator.java} |  9 ++--
 .../data/pipeline/cdc/util/CDCDataRecordUtil.java  | 29 ++++++------
 .../cdc/yaml/job/YamlCDCJobConfiguration.java      | 15 ++++++
 .../yaml/job/YamlCDCJobConfigurationSwapper.java   | 15 +++++-
 ...here.data.pipeline.spi.importer.ImporterCreator |  2 +-
 .../pipeline/cdc/core/ack/CDCAckHolderTest.java    | 32 ++++++-------
 ...est.java => SocketSinkImporterCreatorTest.java} |  8 ++--
 .../pipeline/cdc/util/CDCDataRecordUtilTest.java   | 14 +++---
 .../job/YamlCDCJobConfigurationSwapperTest.java    |  5 ++
 .../backend/handler/cdc/CDCBackendHandler.java     |  8 ++--
 16 files changed, 181 insertions(+), 96 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 30ad2964e33..4a3a3495cd7 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -48,10 +48,12 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -87,6 +89,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -106,14 +109,20 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
      * Create CDC job config.
      *
      * @param param create CDC job param
+     * @param sinkType sink type
+     * @param sinkProps sink properties
      * @return job id
      */
-    public String createJob(final StreamDataParameter param) {
+    public String createJob(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
         yamlJobConfig.setDatabase(param.getDatabase());
         yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
         yamlJobConfig.setFull(param.isFull());
         yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
+        YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
+        sinkConfig.setSinkType(sinkType.name());
+        sinkConfig.setProps(sinkProps);
+        yamlJobConfig.setSinkConfig(sinkConfig);
         ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
         yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
         List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtil.convertDataNodesToLines(param.getDataNodesMap());
@@ -127,7 +136,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
         if (repositoryAPI.isExisted(jobConfigKey)) {
-            log.warn("cdc job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
+            log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
             return jobConfig.getJobId();
         }
         repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClassName());
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 7a04f3d7bca..45a639f7fda 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -22,8 +22,10 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 
 import java.util.List;
+import java.util.Properties;
 
 /**
  * CDC job configuration.
@@ -50,6 +52,8 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
     
     private final boolean decodeWithTX;
     
+    private final SinkConfiguration sinkConfig;
+    
     private final int concurrency;
     
     private final int retryTimes;
@@ -58,4 +62,13 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
     public int getJobShardingCount() {
         return jobShardingDataNodes.size();
     }
+    
+    @RequiredArgsConstructor
+    @Getter
+    public static class SinkConfiguration {
+        
+        private final CDCSinkType sinkType;
+        
+        private final Properties props;
+    }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCSinkType.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCSinkType.java
new file mode 100644
index 00000000000..96ce4f08f5e
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCSinkType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC sink type.
+ */
+public enum CDCSinkType {
+    SOCKET
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
index 134027c25c5..7d5e07fea05 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 
 import java.util.Map;
 import java.util.UUID;
@@ -33,7 +33,7 @@ public final class CDCAckHolder {
     
     private static final CDCAckHolder INSTANCE = new CDCAckHolder();
     
-    private final Map<String, Map<CDCImporter, CDCAckPosition>> ackIdImporterMap = new ConcurrentHashMap<>();
+    private final Map<String, Map<SocketSinkImporter, CDCAckPosition>> ackIdPositionMap = new ConcurrentHashMap<>();
     
     /**
      * the ack of CDC.
@@ -41,9 +41,9 @@ public final class CDCAckHolder {
      * @param ackId ack id
      */
     public void ack(final String ackId) {
-        Map<CDCImporter, CDCAckPosition> importerDataRecordMap = ackIdImporterMap.remove(ackId);
+        Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = ackIdPositionMap.remove(ackId);
         if (null != importerDataRecordMap) {
-            importerDataRecordMap.forEach(CDCImporter::ackWithLastDataRecord);
+            importerDataRecordMap.forEach(SocketSinkImporter::ackWithLastDataRecord);
         }
     }
     
@@ -53,10 +53,10 @@ public final class CDCAckHolder {
      * @param importerDataRecordMap import data record map
      * @return ack id
      */
-    public String bindAckIdWithPosition(final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
+    public String bindAckIdWithPosition(final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
         String result = generateAckId();
         // TODO it's might need to persist to registry center in cluster mode.
-        ackIdImporterMap.put(result, importerDataRecordMap);
+        ackIdPositionMap.put(result, importerDataRecordMap);
         return result;
     }
     
@@ -67,13 +67,13 @@ public final class CDCAckHolder {
     /**
      * Clean up.
      *
-     * @param cdcImporter CDC importer
+     * @param socketSinkImporter CDC importer
      */
-    public void cleanUp(final CDCImporter cdcImporter) {
-        if (ackIdImporterMap.isEmpty()) {
+    public void cleanUp(final SocketSinkImporter socketSinkImporter) {
+        if (ackIdPositionMap.isEmpty()) {
             return;
         }
-        ackIdImporterMap.entrySet().removeIf(entry -> entry.getValue().containsKey(cdcImporter));
+        ackIdPositionMap.entrySet().removeIf(entry -> entry.getValue().containsKey(socketSinkImporter));
     }
     
     /**
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
similarity index 78%
rename from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 77ccf10446c..96192c06a75 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
+package org.apache.shardingsphere.data.pipeline.cdc.core.connector;
 
 import io.netty.channel.Channel;
 import lombok.Getter;
@@ -26,9 +26,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtil;
@@ -54,10 +55,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * CDC importer connector.
+ * Socket sink importer connector.
  */
 @Slf4j
-public final class CDCImporterConnector implements ImporterConnector {
+public final class SocketSinkImporterConnector implements ImporterConnector {
     
     private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
     
@@ -79,14 +80,14 @@ public final class CDCImporterConnector implements ImporterConnector {
     
     private final Map<String, String> tableNameSchemaMap = new HashMap<>();
     
-    private final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap = new ConcurrentHashMap<>();
+    private final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap = new ConcurrentHashMap<>();
     
     private final AtomicInteger runningIncrementalTaskCount = new AtomicInteger(0);
     
     private Thread incrementalImporterTask;
     
-    public CDCImporterConnector(final Channel channel, final String database, final int jobShardingCount, final Collection<String> schemaTableNames,
-                                final Comparator<DataRecord> dataRecordComparator) {
+    public SocketSinkImporterConnector(final Channel channel, final String database, final int jobShardingCount, final Collection<String> schemaTableNames,
+                                       final Comparator<DataRecord> dataRecordComparator) {
         this.channel = channel;
         this.database = database;
         this.jobShardingCount = jobShardingCount;
@@ -106,29 +107,29 @@ public final class CDCImporterConnector implements ImporterConnector {
      * Write data record into channel.
      *
      * @param recordList data records
-     * @param cdcImporter cdc importer
+     * @param socketSinkImporter cdc importer
      * @param importerType importer type
      */
-    public void write(final List<Record> recordList, final CDCImporter cdcImporter, final ImporterType importerType) {
+    public void write(final List<Record> recordList, final SocketSinkImporter socketSinkImporter, final ImporterType importerType) {
         if (recordList.isEmpty()) {
             return;
         }
         if (ImporterType.INVENTORY == importerType || null == dataRecordComparator) {
-            Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
+            Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
             int dataRecordCount = (int) recordList.stream().filter(each -> each instanceof DataRecord).count();
             Record lastRecord = recordList.get(recordList.size() - 1);
             if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
-                cdcImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
+                socketSinkImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
                 return;
             }
-            importerDataRecordMap.put(cdcImporter, new CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
+            importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
             writeImmediately(recordList, importerDataRecordMap);
         } else if (ImporterType.INCREMENTAL == importerType) {
-            writeIntoQueue(recordList, cdcImporter);
+            writeIntoQueue(recordList, socketSinkImporter);
         }
     }
     
-    private void writeImmediately(final List<? extends Record> recordList, final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
+    private void writeImmediately(final List<? extends Record> recordList, final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
         while (!channel.isWritable() && channel.isActive()) {
             doAwait();
         }
@@ -159,8 +160,8 @@ public final class CDCImporterConnector implements ImporterConnector {
     }
     
     @SneakyThrows(InterruptedException.class)
-    private void writeIntoQueue(final List<Record> dataRecords, final CDCImporter cdcImporter) {
-        BlockingQueue<Record> blockingQueue = incrementalRecordMap.get(cdcImporter);
+    private void writeIntoQueue(final List<Record> dataRecords, final SocketSinkImporter socketSinkImporter) {
+        BlockingQueue<Record> blockingQueue = incrementalRecordMap.get(socketSinkImporter);
         if (null == blockingQueue) {
             log.warn("not find the queue to write");
             return;
@@ -171,13 +172,13 @@ public final class CDCImporterConnector implements ImporterConnector {
     }
     
     /**
-     * Send finished record event.
+     * Send incremental start event.
      *
-     * @param cdcImporter cdc importer
+     * @param socketSinkImporter socket sink importer
      * @param batchSize batch size
      */
-    public void sendIncrementalStartEvent(final CDCImporter cdcImporter, final int batchSize) {
-        incrementalRecordMap.computeIfAbsent(cdcImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
+    public void sendIncrementalStartEvent(final SocketSinkImporter socketSinkImporter, final int batchSize) {
+        incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
         int count = runningIncrementalTaskCount.incrementAndGet();
         if (count < jobShardingCount || null == dataRecordComparator) {
             return;
@@ -190,20 +191,20 @@ public final class CDCImporterConnector implements ImporterConnector {
     }
     
     /**
-     * Clean CDC importer connector.
+     * Clean socket sink importer connector.
      *
-     * @param cdcImporter CDC importer
+     * @param socketSinkImporter CDC importer
      */
-    public void clean(final CDCImporter cdcImporter) {
-        incrementalRecordMap.remove(cdcImporter);
-        if (ImporterType.INCREMENTAL == cdcImporter.getImporterType()) {
+    public void clean(final SocketSinkImporter socketSinkImporter) {
+        incrementalRecordMap.remove(socketSinkImporter);
+        if (ImporterType.INCREMENTAL == socketSinkImporter.getImporterType()) {
             incrementalTaskRunning = false;
         }
     }
     
     @Override
     public String getType() {
-        return "CDC";
+        return CDCSinkType.SOCKET.name();
     }
     
     @RequiredArgsConstructor
@@ -214,7 +215,7 @@ public final class CDCImporterConnector implements ImporterConnector {
         @Override
         public void run() {
             while (incrementalTaskRunning) {
-                Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
+                Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
                 List<DataRecord> dataRecords = new LinkedList<>();
                 for (int i = 0; i < batchSize; i++) {
                     DataRecord minimumDataRecord = CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(incrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
similarity index 88%
rename from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
index 9b066b6dee8..b267591a84d 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
@@ -32,7 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.Pipelin
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -43,17 +43,17 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * CDC importer.
+ * Socket sink importer.
  */
 @Slf4j
-public final class CDCImporter extends AbstractLifecycleExecutor implements Importer {
+public final class SocketSinkImporter extends AbstractLifecycleExecutor implements Importer {
     
     @Getter(AccessLevel.PROTECTED)
     private final ImporterConfiguration importerConfig;
     
     private final PipelineChannel channel;
     
-    private final CDCImporterConnector importerConnector;
+    private final SocketSinkImporterConnector importerConnector;
     
     private final PipelineJobProgressListener jobProgressListener;
     
@@ -62,12 +62,12 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
     
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    public CDCImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel, final PipelineJobProgressListener jobProgressListener,
-                       final ImporterType importerType) {
+    public SocketSinkImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
+                              final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
         this.importerConfig = importerConfig;
         rateLimitAlgorithm = null == importerConfig ? null : importerConfig.getRateLimitAlgorithm();
         this.channel = channel;
-        this.importerConnector = (CDCImporterConnector) importerConnector;
+        this.importerConnector = (SocketSinkImporterConnector) importerConnector;
         this.jobProgressListener = jobProgressListener;
         this.importerType = importerType;
     }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
similarity index 84%
rename from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
index 9f1034ef0eb..0bbefdd059c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
@@ -21,23 +21,24 @@ import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 /**
- * CDC importer creator.
+ * Socket sink importer creator.
  */
-public final class CDCImporterCreator implements ImporterCreator {
+public final class SocketSinkImporterCreator implements ImporterCreator {
     
     @Override
     public Importer createImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
                                    final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
-        return new CDCImporter(importerConfig, importerConnector, channel, jobProgressListener, importerType);
+        return new SocketSinkImporter(importerConfig, importerConnector, channel, jobProgressListener, importerType);
     }
     
     @Override
     public String getType() {
-        return "CDC";
+        return CDCSinkType.SOCKET.name();
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
index 295d03cabfe..bd3edd8cb10 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.util;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 
 import java.util.Comparator;
 import java.util.HashMap;
@@ -41,8 +41,8 @@ public final class CDCDataRecordUtil {
      * @param cdcAckPositionMap CDC ack position map.
      * @return minimum data record
      */
-    public static DataRecord findMinimumDataRecordAndSavePosition(final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final Comparator<DataRecord> dataRecordComparator,
-                                                                  final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
+    public static DataRecord findMinimumDataRecordAndSavePosition(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap, final Comparator<DataRecord> dataRecordComparator,
+                                                                  final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
         if (null == dataRecordComparator) {
             return findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
         } else {
@@ -50,8 +50,9 @@ public final class CDCDataRecordUtil {
         }
     }
     
-    private static DataRecord findMinimumDataRecordWithoutComparator(final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
-        for (Entry<CDCImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
+    private static DataRecord findMinimumDataRecordWithoutComparator(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
+                                                                     final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
+        for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
             Record record = entry.getValue().poll();
             if (!(record instanceof DataRecord)) {
                 continue;
@@ -62,20 +63,20 @@ public final class CDCDataRecordUtil {
         return null;
     }
     
-    private static void saveAckPosition(final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap, final CDCImporter cdcImporter, final Record record) {
-        CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(cdcImporter);
+    private static void saveAckPosition(final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final SocketSinkImporter socketSinkImporter, final Record record) {
+        CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(socketSinkImporter);
         if (null == cdcAckPosition) {
-            cdcAckPositionMap.put(cdcImporter, new CDCAckPosition(record, 1));
+            cdcAckPositionMap.put(socketSinkImporter, new CDCAckPosition(record, 1));
         } else {
             cdcAckPosition.setLastRecord(record);
             cdcAckPosition.setDataRecordCount(cdcAckPosition.getDataRecordCount());
         }
     }
     
-    private static DataRecord findMinimumDataRecordWithComparator(final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap,
-                                                                  final Comparator<DataRecord> dataRecordComparator) {
-        Map<CDCImporter, DataRecord> waitSortedMap = new HashMap<>();
-        for (Entry<CDCImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
+    private static DataRecord findMinimumDataRecordWithComparator(final Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
+                                                                  final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final Comparator<DataRecord> dataRecordComparator) {
+        Map<SocketSinkImporter, DataRecord> waitSortedMap = new HashMap<>();
+        for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : incrementalRecordMap.entrySet()) {
             Record peek = entry.getValue().peek();
             if (null == peek) {
                 continue;
@@ -88,8 +89,8 @@ public final class CDCDataRecordUtil {
             return null;
         }
         DataRecord minRecord = null;
-        CDCImporter belongImporter = null;
-        for (Entry<CDCImporter, DataRecord> entry : waitSortedMap.entrySet()) {
+        SocketSinkImporter belongImporter = null;
+        for (Entry<SocketSinkImporter, DataRecord> entry : waitSortedMap.entrySet()) {
             if (null == minRecord) {
                 minRecord = entry.getValue();
                 belongImporter = entry.getKey();
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 2d75a23931c..c0c18380d7e 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJ
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 
 import java.util.List;
+import java.util.Properties;
 
 /**
  * CDC job configuration for YAML.
@@ -49,6 +50,8 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
     
     private boolean decodeWithTX;
     
+    private YamlSinkConfiguration sinkConfig;
+    
     private int concurrency = 1;
     
     private int retryTimes;
@@ -57,4 +60,16 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
     public String getTargetDatabaseName() {
         throw new UnsupportedOperationException();
     }
+    
+    /**
+     * Sink configuration for YAML.
+     */
+    @Getter
+    @Setter
+    public static class YamlSinkConfiguration {
+        
+        private String sinkType;
+        
+        private Properties props;
+    }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 8555a1e765c..2ec6780748c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -21,6 +21,9 @@ import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -48,20 +51,30 @@ public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSw
         List<String> jobShardingDataNodes = null == data.getJobShardingDataNodes() ? null : data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList());
         result.setJobShardingDataNodes(jobShardingDataNodes);
         result.setDecodeWithTX(data.isDecodeWithTX());
+        result.setSinkConfig(swapToYamlSinkConfiguration(data.getSinkConfig()));
         result.setConcurrency(data.getConcurrency());
         result.setRetryTimes(0);
         return result;
     }
     
+    private YamlSinkConfiguration swapToYamlSinkConfiguration(final SinkConfiguration sinkConfig) {
+        YamlSinkConfiguration result = new YamlSinkConfiguration();
+        result.setSinkType(sinkConfig.getSinkType().name());
+        result.setProps(sinkConfig.getProps());
+        return result;
+    }
+    
     @Override
     public CDCJobConfiguration swapToObject(final YamlCDCJobConfiguration yamlConfig) {
         List<JobDataNodeLine> jobShardingDataNodes = null == yamlConfig.getJobShardingDataNodes()
                 ? Collections.emptyList()
                 : yamlConfig.getJobShardingDataNodes().stream().map(JobDataNodeLine::unmarshal).collect(Collectors.toList());
+        YamlSinkConfiguration yamlSinkConfig = yamlConfig.getSinkConfig();
+        SinkConfiguration sinkConfig = new SinkConfiguration(CDCSinkType.valueOf(yamlSinkConfig.getSinkType()), yamlSinkConfig.getProps());
         JobDataNodeLine tablesFirstDataNodes = null == yamlConfig.getTablesFirstDataNodes() ? null : JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
         return new CDCJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabase(), yamlConfig.getSchemaTableNames(), yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
                 (ShardingSpherePipelineDataSourceConfiguration) dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), tablesFirstDataNodes,
-                jobShardingDataNodes, yamlConfig.isDecodeWithTX(), yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
+                jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
     
     /**
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
index cb3296d6a69..8b03ec4f243 100644
--- a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterCreator
+org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporterCreator
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
index bc15f22efe3..7ab1b895e3f 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
 import org.junit.Test;
 
@@ -37,28 +37,28 @@ public final class CDCAckHolderTest {
     @Test
     public void assertBindAckIdWithPositionAndAck() {
         CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
-        final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
-        CDCImporter cdcImporter = mock(CDCImporter.class);
-        importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0));
-        Optional<Map<String, Map<CDCImporter, CDCAckPosition>>> ackIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdImporterMap");
-        assertTrue(ackIdImporterMap.isPresent());
-        assertTrue(ackIdImporterMap.get().isEmpty());
+        final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
+        SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
+        importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0));
+        Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdPositionMap");
+        assertTrue(ackIdPositionMap.isPresent());
+        assertTrue(ackIdPositionMap.get().isEmpty());
         String ackId = cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
-        assertThat(ackIdImporterMap.get().size(), is(1));
+        assertThat(ackIdPositionMap.get().size(), is(1));
         cdcAckHolder.ack(ackId);
-        assertTrue(ackIdImporterMap.get().isEmpty());
+        assertTrue(ackIdPositionMap.get().isEmpty());
     }
     
     @Test
     public void assertCleanUpTimeoutAckId() {
         CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
-        final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
-        CDCImporter cdcImporter = mock(CDCImporter.class);
-        importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 * 1000 * 10));
+        final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
+        SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
+        importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 * 1000 * 10));
         cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
-        cdcAckHolder.cleanUp(cdcImporter);
-        Optional<Map<String, Map<CDCImporter, CDCAckPosition>>> actualAckIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdImporterMap");
-        assertTrue(actualAckIdImporterMap.isPresent());
-        assertTrue(actualAckIdImporterMap.get().isEmpty());
+        cdcAckHolder.cleanUp(socketSinkImporter);
+        Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtil.getFieldValue(cdcAckHolder, "ackIdPositionMap");
+        assertTrue(ackIdPositionMap.isPresent());
+        assertTrue(ackIdPositionMap.get().isEmpty());
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
similarity index 79%
rename from kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
rename to kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
index 13b2708399b..6662af13712 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
 
 import io.netty.channel.Channel;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.junit.Test;
@@ -34,14 +34,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class CDCImporterCreatorTest {
+public final class SocketSinkImporterCreatorTest {
     
     @Mock
     private ImporterConfiguration importerConfig;
     
     @Test
     public void assertCreateCDCImporter() {
-        CDCImporterConnector importerConnector = new CDCImporterConnector(mock(Channel.class), "test", 1, Collections.emptyList(), null);
-        assertThat(TypedSPILoader.getService(ImporterCreator.class, "CDC").createImporter(importerConfig, importerConnector, null, null, null), instanceOf(CDCImporter.class));
+        SocketSinkImporterConnector importerConnector = new SocketSinkImporterConnector(mock(Channel.class), "test", 1, Collections.emptyList(), null);
+        assertThat(TypedSPILoader.getService(ImporterCreator.class, "Socket").createImporter(importerConfig, importerConnector, null, null, null), instanceOf(SocketSinkImporter.class));
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
index 658d8fcdfc3..17e0bd5a33f 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPo
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.junit.Test;
@@ -41,21 +41,21 @@ public final class CDCDataRecordUtilTest {
     
     @Test
     public void assertFindMinimumDataRecordAndSavePosition() throws InterruptedException {
-        final Map<CDCImporter, BlockingQueue<Record>> actualIncrementalRecordMap = new HashMap<>();
+        final Map<SocketSinkImporter, BlockingQueue<Record>> actualIncrementalRecordMap = new HashMap<>();
         ArrayBlockingQueue<Record> queueFirst = new ArrayBlockingQueue<>(5);
         queueFirst.put(generateDataRecord(0));
         queueFirst.put(generateDataRecord(2));
         queueFirst.put(generateDataRecord(4));
-        CDCImporter mockCdcImporterFirst = mock(CDCImporter.class);
-        actualIncrementalRecordMap.put(mockCdcImporterFirst, queueFirst);
+        SocketSinkImporter mockSocketSinkImporterFirst = mock(SocketSinkImporter.class);
+        actualIncrementalRecordMap.put(mockSocketSinkImporterFirst, queueFirst);
         ArrayBlockingQueue<Record> queueSecond = new ArrayBlockingQueue<>(5);
         queueSecond.put(generateDataRecord(1));
         queueSecond.put(generateDataRecord(3));
         queueSecond.put(generateDataRecord(5));
-        CDCImporter mockCdcImporterSecond = mock(CDCImporter.class);
-        actualIncrementalRecordMap.put(mockCdcImporterSecond, queueSecond);
+        SocketSinkImporter mockSocketSinkImporterSecond = mock(SocketSinkImporter.class);
+        actualIncrementalRecordMap.put(mockSocketSinkImporterSecond, queueSecond);
         Comparator<DataRecord> dataRecordComparator = DataRecordComparatorGenerator.generatorIncrementalComparator(new OpenGaussDatabaseType());
-        final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
+        final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
         for (long i = 0; i <= 5; i++) {
             DataRecord minimumDataRecord = CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
             assertThat(minimumDataRecord.getCsn(), is(i));
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index c127a9e8798..5a62e46a04a 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
 
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -35,6 +37,9 @@ public final class YamlCDCJobConfigurationSwapperTest {
         yamlJobConfig.setDatabase("test_db");
         yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", "t_order_item"));
         yamlJobConfig.setFull(true);
+        YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
+        sinkConfig.setSinkType(CDCSinkType.SOCKET.name());
+        yamlJobConfig.setSinkConfig(sinkConfig);
         CDCJobConfiguration actual = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         assertThat(actual.getJobId(), is("j51017f973ac82cb1edea4f5238a258c25e89"));
         assertThat(actual.getDatabase(), is("test_db"));
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index dbbdb7f60dc..ddfe27888ba 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -25,9 +25,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
@@ -62,6 +63,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -112,7 +114,7 @@ public final class CDCBackendHandler {
         }
         boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
         StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
-        String jobId = jobAPI.createJob(parameter);
+        String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
         connectionContext.setJobId(jobId);
         startStreaming(requestId, jobId, connectionContext, channel);
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
@@ -206,7 +208,7 @@ public final class CDCBackendHandler {
         Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
                 ? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
                 : null;
-        CDCJob job = new CDCJob(new CDCImporterConnector(channel, cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+        CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);