You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/02 04:47:46 UTC

[shardingsphere] branch master updated: Add show streaming list/status DistSQL for CDC (#24404)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7425026a04f Add show streaming list/status DistSQL for CDC (#24404)
7425026a04f is described below

commit 7425026a04f1ec94ced76375292bd5ba558bc617
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Mar 2 12:47:33 2023 +0800

    Add show streaming list/status DistSQL for CDC (#24404)
    
    * Add show streaming list/status DistSQL for CDC
    
    * Add DistSQL unit test
    
    * Add blank line at end
    
    * Fix codestyle and rename database to databaseName
    
    * Improve
    
    * Remove TODO and improve code
    
    * Remove redundant this
---
 ...Info.java => CDCTableBasedPipelineJobInfo.java} | 10 ++--
 .../api/pojo/TableBasedPipelineJobInfo.java        |  8 +++
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 18 ++++---
 .../pipeline/cdc/api/pojo/StreamDataParameter.java |  2 +-
 .../cdc/config/job/CDCJobConfiguration.java        |  2 +-
 .../cdc/yaml/job/YamlCDCJobConfiguration.java      |  2 +-
 .../yaml/job/YamlCDCJobConfigurationSwapper.java   |  4 +-
 .../job/YamlCDCJobConfigurationSwapperTest.java    |  4 +-
 kernel/data-pipeline/distsql/handler/pom.xml       |  5 ++
 .../query/ShowStreamingJobStatusExecutor.java}     | 47 ++++++++--------
 .../handler/query/ShowStreamingListExecutor.java   | 55 +++++++++++++++++++
 .../query/ShowMigrationJobStatusExecutor.java      | 33 ++++++------
 ....distsql.handler.ral.query.QueryableRALExecutor |  2 +
 kernel/data-pipeline/distsql/parser/pom.xml        | 12 +++++
 .../distsql/parser/autogen/CDCDistSQLStatement.g4} | 21 +++-----
 .../src/main/antlr4/imports/cdc/Alphabet.g4}       | 43 ++++++++++-----
 .../src/main/antlr4/imports/cdc/BaseRule.g4}       | 17 +-----
 .../parser/src/main/antlr4/imports/cdc/Keyword.g4} | 35 +++++++-----
 .../src/main/antlr4/imports/cdc/Literals.g4}       | 29 +++++-----
 .../src/main/antlr4/imports/cdc/RALStatement.g4}   | 27 +++++-----
 .../parser/src/main/antlr4/imports/cdc/Symbol.g4   | 62 ++++++++++++++++++++++
 .../cdc/distsql/parser/core/CDCDistSQLLexer.java}  | 19 ++++---
 .../cdc/distsql/parser/core/CDCDistSQLParser.java} | 35 ++++++------
 .../parser/core/CDCDistSQLStatementVisitor.java    | 48 +++++++++++++++++
 .../facade/CDCDistSQLStatementParserFacade.java    | 52 ++++++++++++++++++
 ...engine.spi.FeaturedDistSQLStatementParserFacade |  1 +
 .../statement/ShowStreamingListStatement.java}     | 15 ++----
 .../statement/ShowStreamingStatusStatement.java}   | 11 ++--
 .../backend/handler/cdc/CDCBackendHandler.java     | 15 +++---
 .../frontend/netty/CDCChannelInboundHandler.java   |  6 +--
 .../impl/cdc/ShowStreamingListStatementAssert.java | 46 ++++++++++++++++
 .../cdc/ShowStreamingStatusStatementAssert.java    | 48 +++++++++++++++++
 .../QueryableScalingRALStatementAssert.java        | 10 ++++
 .../cases/parser/jaxb/RootSQLParserTestCases.java  |  8 +++
 .../cdc/ShowStreamingListStatementTestCase.java    | 15 ++----
 .../cdc/ShowStreamingStatusStatementTestCase.java  | 18 ++++---
 test/it/parser/src/main/resources/case/ral/cdc.xml | 25 +++++++++
 .../src/main/resources/sql/supported/ral/cdc.xml   | 22 ++++++++
 38 files changed, 614 insertions(+), 218 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
similarity index 83%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
index 05ab6eb54c8..77e836483c7 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
@@ -21,13 +21,15 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 /**
- * Table based pipeline job info.
+ * CDC table based pipeline job info.
  */
-@RequiredArgsConstructor
 @Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+@RequiredArgsConstructor
+public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
     
     private final PipelineJobMetaData jobMetaData;
     
-    private final String table;
+    private final String databaseName;
+    
+    private final String schemaTableNames;
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
index 05ab6eb54c8..ef321c784e4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
@@ -29,5 +29,13 @@ public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
     
     private final PipelineJobMetaData jobMetaData;
     
+    private final String databaseName;
+    
     private final String table;
+    
+    public TableBasedPipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) {
+        this.jobMetaData = jobMetaData;
+        databaseName = null;
+        this.table = table;
+    }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 76e1c96e84a..89ba6a17672 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -43,6 +43,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
@@ -115,7 +117,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
      */
     public String createJob(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
-        yamlJobConfig.setDatabase(param.getDatabase());
+        yamlJobConfig.setDatabaseName(param.getDatabaseName());
         yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
         yamlJobConfig.setFull(param.isFull());
         yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
@@ -123,7 +125,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         sinkConfig.setSinkType(sinkType.name());
         sinkConfig.setProps(sinkProps);
         yamlJobConfig.setSinkConfig(sinkConfig);
-        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
+        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
         yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
         List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtil.convertDataNodesToLines(param.getDataNodesMap());
         yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
@@ -203,7 +205,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     
     private String generateJobId(final YamlCDCJobConfiguration config) {
         // TODO generate parameter add sink type
-        CDCJobId jobId = new CDCJobId(config.getDatabase(), config.getSchemaTableNames(), config.isFull());
+        CDCJobId jobId = new CDCJobId(config.getDatabaseName(), config.getSchemaTableNames(), config.isFull());
         return marshalJobId(jobId);
     }
     
@@ -270,12 +272,12 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    public PipelineJobConfiguration getJobConfiguration(final String jobId) {
+    public CDCJobConfiguration getJobConfiguration(final String jobId) {
         return getJobConfiguration(getElasticJobConfigPOJO(jobId));
     }
     
     @Override
-    protected PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+    protected CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
         return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
@@ -286,7 +288,11 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     protected PipelineJobInfo getJobInfo(final String jobId) {
-        throw new UnsupportedOperationException();
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobId, !jobConfigPOJO.isDisabled(),
+                jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+        CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+        return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
     @Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
index a62bf5cda31..36c9ee91232 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
@@ -31,7 +31,7 @@ import java.util.Map;
 @Getter
 public final class StreamDataParameter {
     
-    private final String database;
+    private final String databaseName;
     
     private final List<String> schemaTableNames;
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 45a639f7fda..8df7a2783b6 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -36,7 +36,7 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
     
     private final String jobId;
     
-    private final String database;
+    private final String databaseName;
     
     private final List<String> schemaTableNames;
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index c0c18380d7e..5559dc2e9ec 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -34,7 +34,7 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
     
     private String jobId;
     
-    private String database;
+    private String databaseName;
     
     private List<String> schemaTableNames;
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 2ec6780748c..8041712e616 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -42,7 +42,7 @@ public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSw
     public YamlCDCJobConfiguration swapToYamlConfiguration(final CDCJobConfiguration data) {
         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
         result.setJobId(data.getJobId());
-        result.setDatabase(data.getDatabase());
+        result.setDatabaseName(data.getDatabaseName());
         result.setSchemaTableNames(data.getSchemaTableNames());
         result.setFull(data.isFull());
         result.setSourceDatabaseType(data.getSourceDatabaseType());
@@ -72,7 +72,7 @@ public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSw
         YamlSinkConfiguration yamlSinkConfig = yamlConfig.getSinkConfig();
         SinkConfiguration sinkConfig = new SinkConfiguration(CDCSinkType.valueOf(yamlSinkConfig.getSinkType()), yamlSinkConfig.getProps());
         JobDataNodeLine tablesFirstDataNodes = null == yamlConfig.getTablesFirstDataNodes() ? null : JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
-        return new CDCJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabase(), yamlConfig.getSchemaTableNames(), yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
+        return new CDCJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(), yamlConfig.getSchemaTableNames(), yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
                 (ShardingSpherePipelineDataSourceConfiguration) dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), tablesFirstDataNodes,
                 jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index 5a62e46a04a..0dd440a7f6f 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -34,7 +34,7 @@ public final class YamlCDCJobConfigurationSwapperTest {
     public void assertSwapToObject() {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
         yamlJobConfig.setJobId("j51017f973ac82cb1edea4f5238a258c25e89");
-        yamlJobConfig.setDatabase("test_db");
+        yamlJobConfig.setDatabaseName("test_db");
         yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", "t_order_item"));
         yamlJobConfig.setFull(true);
         YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
@@ -42,7 +42,7 @@ public final class YamlCDCJobConfigurationSwapperTest {
         yamlJobConfig.setSinkConfig(sinkConfig);
         CDCJobConfiguration actual = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         assertThat(actual.getJobId(), is("j51017f973ac82cb1edea4f5238a258c25e89"));
-        assertThat(actual.getDatabase(), is("test_db"));
+        assertThat(actual.getDatabaseName(), is("test_db"));
         assertThat(actual.getSchemaTableNames(), is(Arrays.asList("test.t_order", "t_order_item")));
         assertTrue(actual.isFull());
     }
diff --git a/kernel/data-pipeline/distsql/handler/pom.xml b/kernel/data-pipeline/distsql/handler/pom.xml
index fd0697db8c7..2eec2fcde88 100644
--- a/kernel/data-pipeline/distsql/handler/pom.xml
+++ b/kernel/data-pipeline/distsql/handler/pom.xml
@@ -43,6 +43,11 @@
             <artifactId>shardingsphere-data-pipeline-scenario-migration</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-data-pipeline-cdc-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-data-pipeline-distsql-statement</artifactId>
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
similarity index 56%
copy from kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
copy to kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 3edc84e8d59..f308304f44a 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.handler.query;
+package org.apache.shardingsphere.cdc.distsql.handler.query;
 
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
@@ -24,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,32 +33,31 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
- * Show migration job status executor.
+ * Show streaming job status executor.
  */
-public final class ShowMigrationJobStatusExecutor implements QueryableRALExecutor<ShowMigrationStatusStatement> {
+public final class ShowStreamingJobStatusExecutor implements QueryableRALExecutor<ShowStreamingStatusStatement> {
     
     @Override
-    public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
-        InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
+    public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
+        InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "CDC");
         List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
-        return jobItemInfos.stream().map(each -> {
-            LocalDataQueryResultRow row;
-            InventoryIncrementalJobItemProgress jobItemProgress = each.getJobItemProgress();
-            if (null != jobItemProgress) {
-                String incrementalIdleSeconds = "";
-                if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
-                    long latestActiveTimeMillis = Math.max(each.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
-                    incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
-                }
-                row = new LocalDataQueryResultRow(each.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
-                        jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), each.getInventoryFinishedPercentage(),
-                        incrementalIdleSeconds, each.getErrorMessage());
-            } else {
-                row = new LocalDataQueryResultRow(each.getShardingItem(), "", "", "", "", "", "", each.getErrorMessage());
-            }
-            return row;
-        }).collect(Collectors.toList());
+        return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
+    }
+    
+    private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
+        InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress();
+        if (null == jobItemProgress) {
+            return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage());
+        }
+        String incrementalIdleSeconds = "";
+        if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
+            long latestActiveTimeMillis = Math.max(jobItemInfo.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
+            incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
+        }
+        return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+                jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), jobItemInfo.getInventoryFinishedPercentage(),
+                incrementalIdleSeconds, jobItemInfo.getErrorMessage());
     }
     
     @Override
@@ -68,6 +67,6 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
     
     @Override
     public String getType() {
-        return ShowMigrationStatusStatement.class.getName();
+        return ShowStreamingStatusStatement.class.getName();
     }
 }
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
new file mode 100644
index 00000000000..33587a16c32
--- /dev/null
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.handler.query;
+
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Show streaming list executor.
+ */
+public final class ShowStreamingListExecutor implements QueryableRALExecutor<ShowStreamingListStatement> {
+    
+    private final CDCJobAPI jobAPI = new CDCJobAPI();
+    
+    @Override
+    public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
+        return jobAPI.list().stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+                ((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
+                each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
+                each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
+    }
+    
+    @Override
+    public Collection<String> getColumnNames() {
+        return Arrays.asList("id", "database", "tables", "job_item_count", "active", "create_time", "stop_time");
+    }
+    
+    @Override
+    public String getType() {
+        return ShowStreamingListStatement.class.getName();
+    }
+}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index 3edc84e8d59..a9462401b5f 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -42,23 +42,22 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
         InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
         List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
-        return jobItemInfos.stream().map(each -> {
-            LocalDataQueryResultRow row;
-            InventoryIncrementalJobItemProgress jobItemProgress = each.getJobItemProgress();
-            if (null != jobItemProgress) {
-                String incrementalIdleSeconds = "";
-                if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
-                    long latestActiveTimeMillis = Math.max(each.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
-                    incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
-                }
-                row = new LocalDataQueryResultRow(each.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
-                        jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), each.getInventoryFinishedPercentage(),
-                        incrementalIdleSeconds, each.getErrorMessage());
-            } else {
-                row = new LocalDataQueryResultRow(each.getShardingItem(), "", "", "", "", "", "", each.getErrorMessage());
-            }
-            return row;
-        }).collect(Collectors.toList());
+        return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
+    }
+    
+    private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
+        InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress();
+        if (null == jobItemProgress) {
+            return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage());
+        }
+        String incrementalIdleSeconds = "";
+        if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
+            long latestActiveTimeMillis = Math.max(jobItemInfo.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
+            incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
+        }
+        return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+                jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), jobItemInfo.getInventoryFinishedPercentage(),
+                incrementalIdleSeconds, jobItemInfo.getErrorMessage());
     }
     
     @Override
diff --git a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
index d8fbcfbc3aa..684c1dcc4cb 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
+++ b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
@@ -20,3 +20,5 @@ org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationJobStatus
 org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckStatusExecutor
 org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationSourceStorageUnitsExecutor
 org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsExecutor
+org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingListExecutor
+org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingJobStatusExecutor
diff --git a/kernel/data-pipeline/distsql/parser/pom.xml b/kernel/data-pipeline/distsql/parser/pom.xml
index 76ffaad1088..8a83c3bdfd7 100644
--- a/kernel/data-pipeline/distsql/parser/pom.xml
+++ b/kernel/data-pipeline/distsql/parser/pom.xml
@@ -102,6 +102,18 @@
                                     <visitor>true</visitor>
                                 </configuration>
                             </execution>
+                            <execution>
+                                <id>antlr-cdc</id>
+                                <goals>
+                                    <goal>antlr4</goal>
+                                </goals>
+                                <configuration>
+                                    <sourceDirectory>src/main/antlr4/cdc/</sourceDirectory>
+                                    <libDirectory>src/main/antlr4/imports/cdc/</libDirectory>
+                                    <listener>false</listener>
+                                    <visitor>true</visitor>
+                                </configuration>
+                            </execution>
                         </executions>
                     </plugin>
                 </plugins>
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index 05ab6eb54c8..a7738218450 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+grammar CDCDistSQLStatement;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import Symbol, RALStatement;
 
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
-}
+execute
+    : (showStreamingList
+    | showStreamingStatus
+    ) SEMI?
+    ;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Alphabet.g4
similarity index 56%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Alphabet.g4
index 05ab6eb54c8..f7603cc7e63 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Alphabet.g4
@@ -15,19 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+lexer grammar Alphabet;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+FOR_GENERATOR: 'DO NOT MATCH ANY THING, JUST FOR GENERATOR';
 
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
-}
+fragment A:   [Aa];
+fragment B:   [Bb];
+fragment C:   [Cc];
+fragment D:   [Dd];
+fragment E:   [Ee];
+fragment F:   [Ff];
+fragment G:   [Gg];
+fragment H:   [Hh];
+fragment I:   [Ii];
+fragment J:   [Jj];
+fragment K:   [Kk];
+fragment L:   [Ll];
+fragment M:   [Mm];
+fragment N:   [Nn];
+fragment O:   [Oo];
+fragment P:   [Pp];
+fragment Q:   [Qq];
+fragment R:   [Rr];
+fragment S:   [Ss];
+fragment T:   [Tt];
+fragment U:   [Uu];
+fragment V:   [Vv];
+fragment W:   [Ww];
+fragment X:   [Xx];
+fragment Y:   [Yy];
+fragment Z:   [Zz];
+fragment UL_: '_';
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
index 05ab6eb54c8..42a04424348 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
@@ -15,19 +15,6 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+grammar BaseRule;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
-}
+import Symbol, Keyword, Literals;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index 05ab6eb54c8..4f9a2b84b3a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -15,19 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+lexer grammar Keyword;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import Alphabet;
 
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
-}
+WS
+    : [ \t\r\n] + ->skip
+    ;
+
+SHOW
+    : S H O W
+    ;
+
+STREAMING
+    : S T R E A M I N G
+    ;
+
+LIST
+    : L I S T
+    ;
+
+STATUS
+    : S T A T U S
+    ;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Literals.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Literals.g4
index 05ab6eb54c8..68f6a1b07ed 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Literals.g4
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+lexer grammar Literals;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import Alphabet, Symbol;
 
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
-}
+IDENTIFIER_
+    : [A-Za-z_$0-9]*?[A-Za-z_$]+?[A-Za-z_$0-9]*
+    | BQ_ ~'`'+ BQ_
+    ;
+
+STRING_
+    : (DQ_ ('\\'. | '""' | ~('"' | '\\'))* DQ_)
+    | (SQ_ ('\\'. | '\'\'' | ~('\'' | '\\'))* SQ_)
+    ;
+
+INT_
+    : [0-9]+
+    ;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 05ab6eb54c8..20072e56999 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+grammar RALStatement;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import BaseRule;
 
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
-}
+showStreamingList
+    : SHOW STREAMING LIST
+    ;
+
+showStreamingStatus
+    : SHOW STREAMING STATUS jobId
+    ;
+
+jobId
+    : INT_ | STRING_
+    ;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Symbol.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Symbol.g4
new file mode 100644
index 00000000000..1da7901ec98
--- /dev/null
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Symbol.g4
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+lexer grammar Symbol;
+
+AND_:                '&&';
+OR_:                 '||';
+NOT_:                '!';
+TILDE_:              '~';
+VERTICALBAR_:        '|';
+AMPERSAND_:          '&';
+SIGNEDLEFTSHIFT_:    '<<';
+SIGNEDRIGHTSHIFT_:   '>>';
+CARET_:              '^';
+MOD_:                '%';
+COLON_:              ':';
+PLUS_:               '+';
+MINUS_:              '-';
+ASTERISK_:           '*';
+SLASH_:              '/';
+BACKSLASH_:          '\\';
+DOT_:                '.';
+DOTASTERISK_:        '.*';
+SAFEEQ_:             '<=>';
+DEQ_:                '==';
+EQ_:                 '=';
+NEQ_:                '<>' | '!=';
+GT_:                 '>';
+GTE_:                '>=';
+LT_:                 '<';
+LTE_:                '<=';
+POUND_:              '#';
+LP_:                 '(';
+RP_:                 ')';
+LBE_:                '{';
+RBE_:                '}';
+LBT_:                '[';
+RBT_:                ']';
+COMMA_:              ',';
+DQ_:                 '"';
+SQ_:                 '\'';
+BQ_:                 '`';
+QUESTION_:           '?';
+AT_:                 '@';
+SEMI_:               ';';
+JSONSEPARATOR_:      '->>';
+UL_:                 '_';
+DL_:                 '$';
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLLexer.java
similarity index 64%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLLexer.java
index 05ab6eb54c8..ece1e7d4884 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLLexer.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.parser.core;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.antlr.v4.runtime.CharStream;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementLexer;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLLexer;
 
 /**
- * Table based pipeline job info.
+ * SQL lexer for CDC DistSQL.
  */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+public final class CDCDistSQLLexer extends CDCDistSQLStatementLexer implements SQLLexer {
     
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
+    public CDCDistSQLLexer(final CharStream input) {
+        super(input);
+    }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLParser.java
similarity index 52%
copy from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
copy to kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLParser.java
index a62bf5cda31..a1e1fd73027 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLParser.java
@@ -15,29 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.parser.core;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-
-import java.util.List;
-import java.util.Map;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLParser;
+import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
+import org.apache.shardingsphere.sql.parser.core.ParseASTNode;
 
 /**
- * Stream data parameter.
+ * SQL parser for CDC.
  */
-@RequiredArgsConstructor
-@Getter
-public final class StreamDataParameter {
-    
-    private final String database;
-    
-    private final List<String> schemaTableNames;
-    
-    private final boolean full;
+public final class CDCDistSQLParser extends CDCDistSQLStatementParser implements SQLParser {
     
-    private final Map<String, List<DataNode>> dataNodesMap;
+    public CDCDistSQLParser(final TokenStream input) {
+        super(input);
+    }
     
-    private final boolean decodeWithTX;
+    @Override
+    public ASTNode parse() {
+        return new ParseASTNode(execute(), (CommonTokenStream) getTokenStream());
+    }
 }
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
new file mode 100644
index 00000000000..cdb3cd8aa22
--- /dev/null
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.parser.core;
+
+import org.antlr.v4.runtime.tree.ParseTree;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
+import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
+import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
+import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+
+/**
+ * SQL statement visitor for CDC DistSQL.
+ */
+public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor {
+    
+    @Override
+    public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
+        return new ShowStreamingListStatement();
+    }
+    
+    @Override
+    public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
+        return new ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
+    }
+    
+    private String getIdentifierValue(final ParseTree ctx) {
+        return null == ctx ? null : new IdentifierValue(ctx.getText()).getValue();
+    }
+}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/facade/CDCDistSQLStatementParserFacade.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/facade/CDCDistSQLStatementParserFacade.java
new file mode 100644
index 00000000000..44b97c14b9b
--- /dev/null
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/facade/CDCDistSQLStatementParserFacade.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.parser.facade;
+
+import org.apache.shardingsphere.cdc.distsql.parser.core.CDCDistSQLLexer;
+import org.apache.shardingsphere.cdc.distsql.parser.core.CDCDistSQLParser;
+import org.apache.shardingsphere.cdc.distsql.parser.core.CDCDistSQLStatementVisitor;
+import org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLLexer;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLParser;
+import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
+
+/**
+ * SQL parser facade for CDC DistSQL statement.
+ */
+public final class CDCDistSQLStatementParserFacade implements FeaturedDistSQLStatementParserFacade {
+    
+    @Override
+    public Class<? extends SQLLexer> getLexerClass() {
+        return CDCDistSQLLexer.class;
+    }
+    
+    @Override
+    public Class<? extends SQLParser> getParserClass() {
+        return CDCDistSQLParser.class;
+    }
+    
+    @Override
+    public Class<? extends SQLVisitor> getVisitorClass() {
+        return CDCDistSQLStatementVisitor.class;
+    }
+    
+    @Override
+    public String getType() {
+        return "CDC";
+    }
+}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade b/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade
index b3799653425..37f8d9a8ba1 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade
+++ b/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade
@@ -16,3 +16,4 @@
 #
 
 org.apache.shardingsphere.migration.distsql.parser.facade.MigrationDistSQLStatementParserFacade
+org.apache.shardingsphere.cdc.distsql.parser.facade.CDCDistSQLStatementParserFacade
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingListStatement.java
similarity index 69%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingListStatement.java
index 05ab6eb54c8..5a7edd4666e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingListStatement.java
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.statement;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
 
 /**
- * Table based pipeline job info.
+ * Show streaming list statement.
  */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
+public final class ShowStreamingListStatement extends QueryableScalingRALStatement {
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingStatusStatement.java
similarity index 74%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingStatusStatement.java
index 05ab6eb54c8..8a56b2c97ca 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingStatusStatement.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.statement;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
 
 /**
- * Table based pipeline job info.
+ * Show streaming status statement.
  */
 @RequiredArgsConstructor
 @Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+public final class ShowStreamingStatusStatement extends QueryableScalingRALStatement {
     
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
+    private final String jobId;
 }
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index ddfe27888ba..b5b6017681f 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -98,7 +98,8 @@ public final class CDCBackendHandler {
             tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
             schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, tableName))));
         } else {
-            tableNames = getTableNamesWithoutSchema(database, requestBody.getSourceSchemaTablesList());
+            schemaTableNames.addAll(getTableNamesWithoutSchema(database, requestBody.getSourceSchemaTablesList()));
+            tableNames = schemaTableNames;
         }
         if (tableNames.isEmpty()) {
             throw new NotFindStreamDataSourceTableException();
@@ -176,13 +177,13 @@ public final class CDCBackendHandler {
     }
     
     /**
-     * Get database by job id.
+     * Get database name by job id.
      *
      * @param jobId job id
      * @return database
      */
-    public String getDatabaseByJobId(final String jobId) {
-        return ((CDCJobConfiguration) jobAPI.getJobConfiguration(jobId)).getDatabase();
+    public String getDatabaseNameByJobId(final String jobId) {
+        return jobAPI.getJobConfiguration(jobId).getDatabaseName();
     }
     
     /**
@@ -196,7 +197,7 @@ public final class CDCBackendHandler {
      */
     // TODO not return CDCResponse
     public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
-        CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) jobAPI.getJobConfiguration(jobId);
+        CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
             return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
         }
@@ -204,11 +205,11 @@ public final class CDCBackendHandler {
         // TODO, ensure that there is only one consumer at a time, job config disable may not be updated when the program is forced to close
         jobConfigPOJO.setDisabled(false);
         PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
-        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabase());
+        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
         Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
                 ? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
                 : null;
-        CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+        CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, cdcJobConfig.getDatabaseName(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 674f74b9e17..29ef39bcfdb 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -208,7 +208,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
-        String database = backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+        String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
         CDCResponse response = backendHandler.startStreaming(request.getRequestId(), requestBody.getStreamingId(), connectionContext, ctx.channel());
         ctx.writeAndFlush(response);
@@ -216,7 +216,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
     
     private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
-        String database = backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+        String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
         backendHandler.stopStreaming(connectionContext.getJobId());
         connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
@@ -226,7 +226,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
     
     private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
-        String database = backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+        String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
         try {
             backendHandler.dropStreaming(connectionContext.getJobId());
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingListStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingListStatementAssert.java
new file mode 100644
index 00000000000..6e3cf925837
--- /dev/null
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingListStatementAssert.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc;
+
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Show streaming list statement assert.
+ */
+public final class ShowStreamingListStatementAssert {
+    
+    /**
+     * Assert show streaming list statement is correct with expected parser result.
+     *
+     * @param assertContext assert context
+     * @param actual actual show streaming list statement
+     * @param expected expected show streaming list statement test case
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, final ShowStreamingListStatement actual, final ShowStreamingListStatementTestCase expected) {
+        if (null == expected) {
+            assertNull(assertContext.getText("Actual statement should not exist."), actual);
+        } else {
+            assertNotNull(assertContext.getText("Actual statement should exist."), actual);
+        }
+    }
+}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingStatusStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingStatusStatementAssert.java
new file mode 100644
index 00000000000..78f300cf7c1
--- /dev/null
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingStatusStatementAssert.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc;
+
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.JobIdAssert;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Show streaming status statement assert.
+ */
+public final class ShowStreamingStatusStatementAssert {
+    
+    /**
+     * Assert show streaming status statement is correct with expected parser result.
+     *
+     * @param assertContext assert context
+     * @param actual actual show streaming status statement
+     * @param expected expected show streaming status statement test case
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, final ShowStreamingStatusStatement actual, final ShowStreamingStatusStatementTestCase expected) {
+        if (null == expected) {
+            assertNull(assertContext.getText("Actual statement should not exist."), actual);
+        } else {
+            assertNotNull(assertContext.getText("Actual statement should exist."), actual);
+            JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
+        }
+    }
+}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java
index 5cc5574f2cf..924d98f4727 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
@@ -26,6 +28,8 @@ import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListSt
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc.ShowStreamingListStatementAssert;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc.ShowStreamingStatusStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationCheckAlgorithmsStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationCheckStatusStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationListStatementAssert;
@@ -33,6 +37,8 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationStatusStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowMigrationListStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckAlgorithmsStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckStatusStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationSourceStorageUnitsStatementTestCase;
@@ -63,6 +69,10 @@ public final class QueryableScalingRALStatementAssert {
             ShowMigrationStatusStatementAssert.assertIs(assertContext, (ShowMigrationStatusStatement) actual, (ShowMigrationStatusStatementTestCase) expected);
         } else if (actual instanceof ShowMigrationSourceStorageUnitsStatement) {
             ShowMigrationSourceStorageUnitsStatementAssert.assertIs(assertContext, (ShowMigrationSourceStorageUnitsStatement) actual, (ShowMigrationSourceStorageUnitsStatementTestCase) expected);
+        } else if (actual instanceof ShowStreamingListStatement) {
+            ShowStreamingListStatementAssert.assertIs(assertContext, (ShowStreamingListStatement) actual, (ShowStreamingListStatementTestCase) expected);
+        } else if (actual instanceof ShowStreamingStatusStatement) {
+            ShowStreamingStatusStatementAssert.assertIs(assertContext, (ShowStreamingStatusStatement) actual, (ShowStreamingStatusStatementTestCase) expected);
         }
     }
 }
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 1a499278bd2..da246c497e8 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -316,6 +316,8 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTrafficRulesStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTransactionRuleStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.DropMigrationCheckStatementTestCase;
@@ -1021,6 +1023,12 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "migrate-table")
     private final List<MigrateTableStatementTestCase> migrateTableTestCases = new LinkedList<>();
     
+    @XmlElement(name = "show-streaming-list")
+    private final List<ShowStreamingListStatementTestCase> showStreamingListTestCases = new LinkedList<>();
+    
+    @XmlElement(name = "show-streaming-status")
+    private final List<ShowStreamingStatusStatementTestCase> showStreamingStatusTestCases = new LinkedList<>();
+    
     @XmlElement(name = "preview-sql")
     private final List<PreviewStatementTestCase> previewTestCases = new LinkedList<>();
     
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingListStatementTestCase.java
similarity index 69%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingListStatementTestCase.java
index 05ab6eb54c8..7910837f867 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingListStatementTestCase.java
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 
 /**
- * Table based pipeline job info.
+ * Show streaming list statement test case.
  */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String table;
+public final class ShowStreamingListStatementTestCase extends SQLParserTestCase {
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingStatusStatementTestCase.java
similarity index 64%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingStatusStatementTestCase.java
index 05ab6eb54c8..1da83f5dec2 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingStatusStatementTestCase.java
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+
+import javax.xml.bind.annotation.XmlElement;
 
 /**
- * Table based pipeline job info.
+ * Show streaming status statement test case.
  */
-@RequiredArgsConstructor
 @Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
+@Setter
+public final class ShowStreamingStatusStatementTestCase extends SQLParserTestCase {
     
-    private final String table;
+    @XmlElement(name = "job-id")
+    private String jobId;
 }
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml b/test/it/parser/src/main/resources/case/ral/cdc.xml
new file mode 100644
index 00000000000..c0b8e7620f5
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-parser-test-cases>
+    <show-streaming-list sql-case-id="show-streaming-list" />
+    
+    <show-streaming-status sql-case-id="show-streaming-status">
+        <job-id>123</job-id>
+    </show-streaming-status>
+</sql-parser-test-cases>
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
new file mode 100644
index 00000000000..31af22ca298
--- /dev/null
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<sql-cases>
+    <sql-case id="show-streaming-list" value="SHOW STREAMING LIST;" db-types="ShardingSphere"/>
+    <sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;" db-types="ShardingSphere"/>
+</sql-cases>