You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/02 04:47:46 UTC
[shardingsphere] branch master updated: Add show streaming list/status DistSQL for CDC (#24404)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7425026a04f Add show streaming list/status DistSQL for CDC (#24404)
7425026a04f is described below
commit 7425026a04f1ec94ced76375292bd5ba558bc617
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Mar 2 12:47:33 2023 +0800
Add show streaming list/status DistSQL for CDC (#24404)
* Add show streaming list/status DistSQL for CDC
* Add DistSQL unit test
* Add blank line at end
* Fix codestyle and rename database to databaseName
* Improve
* Remove TODO and improve code
* Remove redundant this
---
...Info.java => CDCTableBasedPipelineJobInfo.java} | 10 ++--
.../api/pojo/TableBasedPipelineJobInfo.java | 8 +++
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 18 ++++---
.../pipeline/cdc/api/pojo/StreamDataParameter.java | 2 +-
.../cdc/config/job/CDCJobConfiguration.java | 2 +-
.../cdc/yaml/job/YamlCDCJobConfiguration.java | 2 +-
.../yaml/job/YamlCDCJobConfigurationSwapper.java | 4 +-
.../job/YamlCDCJobConfigurationSwapperTest.java | 4 +-
kernel/data-pipeline/distsql/handler/pom.xml | 5 ++
.../query/ShowStreamingJobStatusExecutor.java} | 47 ++++++++--------
.../handler/query/ShowStreamingListExecutor.java | 55 +++++++++++++++++++
.../query/ShowMigrationJobStatusExecutor.java | 33 ++++++------
....distsql.handler.ral.query.QueryableRALExecutor | 2 +
kernel/data-pipeline/distsql/parser/pom.xml | 12 +++++
.../distsql/parser/autogen/CDCDistSQLStatement.g4} | 21 +++-----
.../src/main/antlr4/imports/cdc/Alphabet.g4} | 43 ++++++++++-----
.../src/main/antlr4/imports/cdc/BaseRule.g4} | 17 +-----
.../parser/src/main/antlr4/imports/cdc/Keyword.g4} | 35 +++++++-----
.../src/main/antlr4/imports/cdc/Literals.g4} | 29 +++++-----
.../src/main/antlr4/imports/cdc/RALStatement.g4} | 27 +++++-----
.../parser/src/main/antlr4/imports/cdc/Symbol.g4 | 62 ++++++++++++++++++++++
.../cdc/distsql/parser/core/CDCDistSQLLexer.java} | 19 ++++---
.../cdc/distsql/parser/core/CDCDistSQLParser.java} | 35 ++++++------
.../parser/core/CDCDistSQLStatementVisitor.java | 48 +++++++++++++++++
.../facade/CDCDistSQLStatementParserFacade.java | 52 ++++++++++++++++++
...engine.spi.FeaturedDistSQLStatementParserFacade | 1 +
.../statement/ShowStreamingListStatement.java} | 15 ++----
.../statement/ShowStreamingStatusStatement.java} | 11 ++--
.../backend/handler/cdc/CDCBackendHandler.java | 15 +++---
.../frontend/netty/CDCChannelInboundHandler.java | 6 +--
.../impl/cdc/ShowStreamingListStatementAssert.java | 46 ++++++++++++++++
.../cdc/ShowStreamingStatusStatementAssert.java | 48 +++++++++++++++++
.../QueryableScalingRALStatementAssert.java | 10 ++++
.../cases/parser/jaxb/RootSQLParserTestCases.java | 8 +++
.../cdc/ShowStreamingListStatementTestCase.java | 15 ++----
.../cdc/ShowStreamingStatusStatementTestCase.java | 18 ++++---
test/it/parser/src/main/resources/case/ral/cdc.xml | 25 +++++++++
.../src/main/resources/sql/supported/ral/cdc.xml | 22 ++++++++
38 files changed, 614 insertions(+), 218 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
similarity index 83%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
index 05ab6eb54c8..77e836483c7 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
@@ -21,13 +21,15 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * Table based pipeline job info.
+ * CDC table based pipeline job info.
*/
-@RequiredArgsConstructor
@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+@RequiredArgsConstructor
+public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
private final PipelineJobMetaData jobMetaData;
- private final String table;
+ private final String databaseName;
+
+ private final String schemaTableNames;
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
index 05ab6eb54c8..ef321c784e4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
@@ -29,5 +29,13 @@ public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
private final PipelineJobMetaData jobMetaData;
+ private final String databaseName;
+
private final String table;
+
+ public TableBasedPipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) {
+ this.jobMetaData = jobMetaData;
+ databaseName = null;
+ this.table = table;
+ }
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 76e1c96e84a..89ba6a17672 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -43,6 +43,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
@@ -115,7 +117,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
*/
public String createJob(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
- yamlJobConfig.setDatabase(param.getDatabase());
+ yamlJobConfig.setDatabaseName(param.getDatabaseName());
yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
yamlJobConfig.setFull(param.isFull());
yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
@@ -123,7 +125,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
sinkConfig.setSinkType(sinkType.name());
sinkConfig.setProps(sinkProps);
yamlJobConfig.setSinkConfig(sinkConfig);
- ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
+ ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtil.convertDataNodesToLines(param.getDataNodesMap());
yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
@@ -203,7 +205,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
private String generateJobId(final YamlCDCJobConfiguration config) {
// TODO generate parameter add sink type
- CDCJobId jobId = new CDCJobId(config.getDatabase(), config.getSchemaTableNames(), config.isFull());
+ CDCJobId jobId = new CDCJobId(config.getDatabaseName(), config.getSchemaTableNames(), config.isFull());
return marshalJobId(jobId);
}
@@ -270,12 +272,12 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- public PipelineJobConfiguration getJobConfiguration(final String jobId) {
+ public CDCJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
}
@Override
- protected PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+ protected CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
@@ -286,7 +288,11 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
protected PipelineJobInfo getJobInfo(final String jobId) {
- throw new UnsupportedOperationException();
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobId, !jobConfigPOJO.isDisabled(),
+ jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+ CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+ return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}
@Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
index a62bf5cda31..36c9ee91232 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
@@ -31,7 +31,7 @@ import java.util.Map;
@Getter
public final class StreamDataParameter {
- private final String database;
+ private final String databaseName;
private final List<String> schemaTableNames;
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 45a639f7fda..8df7a2783b6 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -36,7 +36,7 @@ public final class CDCJobConfiguration implements PipelineJobConfiguration {
private final String jobId;
- private final String database;
+ private final String databaseName;
private final List<String> schemaTableNames;
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index c0c18380d7e..5559dc2e9ec 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -34,7 +34,7 @@ public final class YamlCDCJobConfiguration implements YamlPipelineJobConfigurati
private String jobId;
- private String database;
+ private String databaseName;
private List<String> schemaTableNames;
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 2ec6780748c..8041712e616 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -42,7 +42,7 @@ public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSw
public YamlCDCJobConfiguration swapToYamlConfiguration(final CDCJobConfiguration data) {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
result.setJobId(data.getJobId());
- result.setDatabase(data.getDatabase());
+ result.setDatabaseName(data.getDatabaseName());
result.setSchemaTableNames(data.getSchemaTableNames());
result.setFull(data.isFull());
result.setSourceDatabaseType(data.getSourceDatabaseType());
@@ -72,7 +72,7 @@ public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSw
YamlSinkConfiguration yamlSinkConfig = yamlConfig.getSinkConfig();
SinkConfiguration sinkConfig = new SinkConfiguration(CDCSinkType.valueOf(yamlSinkConfig.getSinkType()), yamlSinkConfig.getProps());
JobDataNodeLine tablesFirstDataNodes = null == yamlConfig.getTablesFirstDataNodes() ? null : JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
- return new CDCJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabase(), yamlConfig.getSchemaTableNames(), yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
+ return new CDCJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(), yamlConfig.getSchemaTableNames(), yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
(ShardingSpherePipelineDataSourceConfiguration) dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), tablesFirstDataNodes,
jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index 5a62e46a04a..0dd440a7f6f 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -34,7 +34,7 @@ public final class YamlCDCJobConfigurationSwapperTest {
public void assertSwapToObject() {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
yamlJobConfig.setJobId("j51017f973ac82cb1edea4f5238a258c25e89");
- yamlJobConfig.setDatabase("test_db");
+ yamlJobConfig.setDatabaseName("test_db");
yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", "t_order_item"));
yamlJobConfig.setFull(true);
YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
@@ -42,7 +42,7 @@ public final class YamlCDCJobConfigurationSwapperTest {
yamlJobConfig.setSinkConfig(sinkConfig);
CDCJobConfiguration actual = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
assertThat(actual.getJobId(), is("j51017f973ac82cb1edea4f5238a258c25e89"));
- assertThat(actual.getDatabase(), is("test_db"));
+ assertThat(actual.getDatabaseName(), is("test_db"));
assertThat(actual.getSchemaTableNames(), is(Arrays.asList("test.t_order", "t_order_item")));
assertTrue(actual.isFull());
}
diff --git a/kernel/data-pipeline/distsql/handler/pom.xml b/kernel/data-pipeline/distsql/handler/pom.xml
index fd0697db8c7..2eec2fcde88 100644
--- a/kernel/data-pipeline/distsql/handler/pom.xml
+++ b/kernel/data-pipeline/distsql/handler/pom.xml
@@ -43,6 +43,11 @@
<artifactId>shardingsphere-data-pipeline-scenario-migration</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-data-pipeline-cdc-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-data-pipeline-distsql-statement</artifactId>
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
similarity index 56%
copy from kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
copy to kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 3edc84e8d59..f308304f44a 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.handler.query;
+package org.apache.shardingsphere.cdc.distsql.handler.query;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
@@ -24,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
import java.util.Arrays;
import java.util.Collection;
@@ -33,32 +33,31 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
- * Show migration job status executor.
+ * Show streaming job status executor.
*/
-public final class ShowMigrationJobStatusExecutor implements QueryableRALExecutor<ShowMigrationStatusStatement> {
+public final class ShowStreamingJobStatusExecutor implements QueryableRALExecutor<ShowStreamingStatusStatement> {
@Override
- public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
- InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
+ public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
+ InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "CDC");
List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
- return jobItemInfos.stream().map(each -> {
- LocalDataQueryResultRow row;
- InventoryIncrementalJobItemProgress jobItemProgress = each.getJobItemProgress();
- if (null != jobItemProgress) {
- String incrementalIdleSeconds = "";
- if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
- long latestActiveTimeMillis = Math.max(each.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
- incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
- }
- row = new LocalDataQueryResultRow(each.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
- jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), each.getInventoryFinishedPercentage(),
- incrementalIdleSeconds, each.getErrorMessage());
- } else {
- row = new LocalDataQueryResultRow(each.getShardingItem(), "", "", "", "", "", "", each.getErrorMessage());
- }
- return row;
- }).collect(Collectors.toList());
+ return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
+ }
+
+ private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
+ InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress();
+ if (null == jobItemProgress) {
+ return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage());
+ }
+ String incrementalIdleSeconds = "";
+ if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
+ long latestActiveTimeMillis = Math.max(jobItemInfo.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
+ incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
+ }
+ return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+ jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), jobItemInfo.getInventoryFinishedPercentage(),
+ incrementalIdleSeconds, jobItemInfo.getErrorMessage());
}
@Override
@@ -68,6 +67,6 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
@Override
public String getType() {
- return ShowMigrationStatusStatement.class.getName();
+ return ShowStreamingStatusStatement.class.getName();
}
}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
new file mode 100644
index 00000000000..33587a16c32
--- /dev/null
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.handler.query;
+
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Show streaming list executor.
+ */
+public final class ShowStreamingListExecutor implements QueryableRALExecutor<ShowStreamingListStatement> {
+
+ private final CDCJobAPI jobAPI = new CDCJobAPI();
+
+ @Override
+ public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
+ return jobAPI.list().stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ ((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
+ each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
+ each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
+ }
+
+ @Override
+ public Collection<String> getColumnNames() {
+ return Arrays.asList("id", "database", "tables", "job_item_count", "active", "create_time", "stop_time");
+ }
+
+ @Override
+ public String getType() {
+ return ShowStreamingListStatement.class.getName();
+ }
+}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index 3edc84e8d59..a9462401b5f 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -42,23 +42,22 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
- return jobItemInfos.stream().map(each -> {
- LocalDataQueryResultRow row;
- InventoryIncrementalJobItemProgress jobItemProgress = each.getJobItemProgress();
- if (null != jobItemProgress) {
- String incrementalIdleSeconds = "";
- if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
- long latestActiveTimeMillis = Math.max(each.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
- incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
- }
- row = new LocalDataQueryResultRow(each.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
- jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), each.getInventoryFinishedPercentage(),
- incrementalIdleSeconds, each.getErrorMessage());
- } else {
- row = new LocalDataQueryResultRow(each.getShardingItem(), "", "", "", "", "", "", each.getErrorMessage());
- }
- return row;
- }).collect(Collectors.toList());
+ return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
+ }
+
+ private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
+ InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress();
+ if (null == jobItemProgress) {
+ return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage());
+ }
+ String incrementalIdleSeconds = "";
+ if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
+ long latestActiveTimeMillis = Math.max(jobItemInfo.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
+ incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
+ }
+ return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+ jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), jobItemInfo.getInventoryFinishedPercentage(),
+ incrementalIdleSeconds, jobItemInfo.getErrorMessage());
}
@Override
diff --git a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
index d8fbcfbc3aa..684c1dcc4cb 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
+++ b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
@@ -20,3 +20,5 @@ org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationJobStatus
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckStatusExecutor
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationSourceStorageUnitsExecutor
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsExecutor
+org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingListExecutor
+org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingJobStatusExecutor
diff --git a/kernel/data-pipeline/distsql/parser/pom.xml b/kernel/data-pipeline/distsql/parser/pom.xml
index 76ffaad1088..8a83c3bdfd7 100644
--- a/kernel/data-pipeline/distsql/parser/pom.xml
+++ b/kernel/data-pipeline/distsql/parser/pom.xml
@@ -102,6 +102,18 @@
<visitor>true</visitor>
</configuration>
</execution>
+ <execution>
+ <id>antlr-cdc</id>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>src/main/antlr4/cdc/</sourceDirectory>
+ <libDirectory>src/main/antlr4/imports/cdc/</libDirectory>
+ <listener>false</listener>
+ <visitor>true</visitor>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index 05ab6eb54c8..a7738218450 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+grammar CDCDistSQLStatement;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import Symbol, RALStatement;
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
-}
+execute
+ : (showStreamingList
+ | showStreamingStatus
+ ) SEMI?
+ ;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Alphabet.g4
similarity index 56%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Alphabet.g4
index 05ab6eb54c8..f7603cc7e63 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Alphabet.g4
@@ -15,19 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+lexer grammar Alphabet;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+FOR_GENERATOR: 'DO NOT MATCH ANY THING, JUST FOR GENERATOR';
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
-}
+fragment A: [Aa];
+fragment B: [Bb];
+fragment C: [Cc];
+fragment D: [Dd];
+fragment E: [Ee];
+fragment F: [Ff];
+fragment G: [Gg];
+fragment H: [Hh];
+fragment I: [Ii];
+fragment J: [Jj];
+fragment K: [Kk];
+fragment L: [Ll];
+fragment M: [Mm];
+fragment N: [Nn];
+fragment O: [Oo];
+fragment P: [Pp];
+fragment Q: [Qq];
+fragment R: [Rr];
+fragment S: [Ss];
+fragment T: [Tt];
+fragment U: [Uu];
+fragment V: [Vv];
+fragment W: [Ww];
+fragment X: [Xx];
+fragment Y: [Yy];
+fragment Z: [Zz];
+fragment UL_: '_';
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
index 05ab6eb54c8..42a04424348 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
@@ -15,19 +15,6 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+grammar BaseRule;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
-}
+import Symbol, Keyword, Literals;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index 05ab6eb54c8..4f9a2b84b3a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -15,19 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+lexer grammar Keyword;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import Alphabet;
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
-}
+WS
+ : [ \t\r\n] + ->skip
+ ;
+
+SHOW
+ : S H O W
+ ;
+
+STREAMING
+ : S T R E A M I N G
+ ;
+
+LIST
+ : L I S T
+ ;
+
+STATUS
+ : S T A T U S
+ ;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Literals.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Literals.g4
index 05ab6eb54c8..68f6a1b07ed 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Literals.g4
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+lexer grammar Literals;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import Alphabet, Symbol;
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
-}
+IDENTIFIER_
+ : [A-Za-z_$0-9]*?[A-Za-z_$]+?[A-Za-z_$0-9]*
+ | BQ_ ~'`'+ BQ_
+ ;
+
+STRING_
+ : (DQ_ ('\\'. | '""' | ~('"' | '\\'))* DQ_)
+ | (SQ_ ('\\'. | '\'\'' | ~('\'' | '\\'))* SQ_)
+ ;
+
+INT_
+ : [0-9]+
+ ;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
similarity index 68%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 05ab6eb54c8..20072e56999 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+grammar RALStatement;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import BaseRule;
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
-}
+showStreamingList
+ : SHOW STREAMING LIST
+ ;
+
+showStreamingStatus
+ : SHOW STREAMING STATUS jobId
+ ;
+
+jobId
+ : INT_ | STRING_
+ ;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Symbol.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Symbol.g4
new file mode 100644
index 00000000000..1da7901ec98
--- /dev/null
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Symbol.g4
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+lexer grammar Symbol;
+
+AND_: '&&';
+OR_: '||';
+NOT_: '!';
+TILDE_: '~';
+VERTICALBAR_: '|';
+AMPERSAND_: '&';
+SIGNEDLEFTSHIFT_: '<<';
+SIGNEDRIGHTSHIFT_: '>>';
+CARET_: '^';
+MOD_: '%';
+COLON_: ':';
+PLUS_: '+';
+MINUS_: '-';
+ASTERISK_: '*';
+SLASH_: '/';
+BACKSLASH_: '\\';
+DOT_: '.';
+DOTASTERISK_: '.*';
+SAFEEQ_: '<=>';
+DEQ_: '==';
+EQ_: '=';
+NEQ_: '<>' | '!=';
+GT_: '>';
+GTE_: '>=';
+LT_: '<';
+LTE_: '<=';
+POUND_: '#';
+LP_: '(';
+RP_: ')';
+LBE_: '{';
+RBE_: '}';
+LBT_: '[';
+RBT_: ']';
+COMMA_: ',';
+DQ_: '"';
+SQ_: '\'';
+BQ_: '`';
+QUESTION_: '?';
+AT_: '@';
+SEMI_: ';';
+JSONSEPARATOR_: '->>';
+UL_: '_';
+DL_: '$';
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLLexer.java
similarity index 64%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLLexer.java
index 05ab6eb54c8..ece1e7d4884 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLLexer.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.parser.core;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.antlr.v4.runtime.CharStream;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementLexer;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLLexer;
/**
- * Table based pipeline job info.
+ * SQL lexer for CDC DistSQL.
*/
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+public final class CDCDistSQLLexer extends CDCDistSQLStatementLexer implements SQLLexer {
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
+ public CDCDistSQLLexer(final CharStream input) {
+ super(input);
+ }
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLParser.java
similarity index 52%
copy from kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
copy to kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLParser.java
index a62bf5cda31..a1e1fd73027 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLParser.java
@@ -15,29 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.parser.core;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-
-import java.util.List;
-import java.util.Map;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLParser;
+import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
+import org.apache.shardingsphere.sql.parser.core.ParseASTNode;
/**
- * Stream data parameter.
+ * SQL parser for CDC.
*/
-@RequiredArgsConstructor
-@Getter
-public final class StreamDataParameter {
-
- private final String database;
-
- private final List<String> schemaTableNames;
-
- private final boolean full;
+public final class CDCDistSQLParser extends CDCDistSQLStatementParser implements SQLParser {
- private final Map<String, List<DataNode>> dataNodesMap;
+ public CDCDistSQLParser(final TokenStream input) {
+ super(input);
+ }
- private final boolean decodeWithTX;
+ @Override
+ public ASTNode parse() {
+ return new ParseASTNode(execute(), (CommonTokenStream) getTokenStream());
+ }
}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
new file mode 100644
index 00000000000..cdb3cd8aa22
--- /dev/null
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.parser.core;
+
+import org.antlr.v4.runtime.tree.ParseTree;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
+import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
+import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
+import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+
+/**
+ * SQL statement visitor for CDC DistSQL.
+ */
+public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor {
+
+ @Override
+ public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
+ return new ShowStreamingListStatement();
+ }
+
+ @Override
+ public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
+ return new ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
+ }
+
+ private String getIdentifierValue(final ParseTree ctx) {
+ return null == ctx ? null : new IdentifierValue(ctx.getText()).getValue();
+ }
+}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/facade/CDCDistSQLStatementParserFacade.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/facade/CDCDistSQLStatementParserFacade.java
new file mode 100644
index 00000000000..44b97c14b9b
--- /dev/null
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/facade/CDCDistSQLStatementParserFacade.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.parser.facade;
+
+import org.apache.shardingsphere.cdc.distsql.parser.core.CDCDistSQLLexer;
+import org.apache.shardingsphere.cdc.distsql.parser.core.CDCDistSQLParser;
+import org.apache.shardingsphere.cdc.distsql.parser.core.CDCDistSQLStatementVisitor;
+import org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLLexer;
+import org.apache.shardingsphere.sql.parser.api.parser.SQLParser;
+import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
+
+/**
+ * SQL parser facade for CDC DistSQL statement.
+ */
+public final class CDCDistSQLStatementParserFacade implements FeaturedDistSQLStatementParserFacade {
+
+ @Override
+ public Class<? extends SQLLexer> getLexerClass() {
+ return CDCDistSQLLexer.class;
+ }
+
+ @Override
+ public Class<? extends SQLParser> getParserClass() {
+ return CDCDistSQLParser.class;
+ }
+
+ @Override
+ public Class<? extends SQLVisitor> getVisitorClass() {
+ return CDCDistSQLStatementVisitor.class;
+ }
+
+ @Override
+ public String getType() {
+ return "CDC";
+ }
+}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade b/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade
index b3799653425..37f8d9a8ba1 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade
+++ b/kernel/data-pipeline/distsql/parser/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.parser.engine.spi.FeaturedDistSQLStatementParserFacade
@@ -16,3 +16,4 @@
#
org.apache.shardingsphere.migration.distsql.parser.facade.MigrationDistSQLStatementParserFacade
+org.apache.shardingsphere.cdc.distsql.parser.facade.CDCDistSQLStatementParserFacade
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingListStatement.java
similarity index 69%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingListStatement.java
index 05ab6eb54c8..5a7edd4666e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingListStatement.java
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.statement;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
/**
- * Table based pipeline job info.
+ * Show streaming list statement.
*/
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
+public final class ShowStreamingListStatement extends QueryableScalingRALStatement {
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingStatusStatement.java
similarity index 74%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingStatusStatement.java
index 05ab6eb54c8..8a56b2c97ca 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingStatusStatement.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.cdc.distsql.statement;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
/**
- * Table based pipeline job info.
+ * Show streaming status statement.
*/
@RequiredArgsConstructor
@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+public final class ShowStreamingStatusStatement extends QueryableScalingRALStatement {
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
+ private final String jobId;
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index ddfe27888ba..b5b6017681f 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -98,7 +98,8 @@ public final class CDCBackendHandler {
tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, tableName))));
} else {
- tableNames = getTableNamesWithoutSchema(database, requestBody.getSourceSchemaTablesList());
+ schemaTableNames.addAll(getTableNamesWithoutSchema(database, requestBody.getSourceSchemaTablesList()));
+ tableNames = schemaTableNames;
}
if (tableNames.isEmpty()) {
throw new NotFindStreamDataSourceTableException();
@@ -176,13 +177,13 @@ public final class CDCBackendHandler {
}
/**
- * Get database by job id.
+ * Get database name by job id.
*
* @param jobId job id
* @return database
*/
- public String getDatabaseByJobId(final String jobId) {
- return ((CDCJobConfiguration) jobAPI.getJobConfiguration(jobId)).getDatabase();
+ public String getDatabaseNameByJobId(final String jobId) {
+ return jobAPI.getJobConfiguration(jobId).getDatabaseName();
}
/**
@@ -196,7 +197,7 @@ public final class CDCBackendHandler {
*/
// TODO not return CDCResponse
public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
- CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) jobAPI.getJobConfiguration(jobId);
+ CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
if (null == cdcJobConfig) {
return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
}
@@ -204,11 +205,11 @@ public final class CDCBackendHandler {
// TODO, ensure that there is only one consumer at a time, job config disable may not be updated when the program is forced to close
jobConfigPOJO.setDisabled(false);
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
- ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabase());
+ ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
- CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+ CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, cdcJobConfig.getDatabaseName(), cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 674f74b9e17..29ef39bcfdb 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -208,7 +208,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
.addListener(ChannelFutureListener.CLOSE);
return;
}
- String database = backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+ String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
CDCResponse response = backendHandler.startStreaming(request.getRequestId(), requestBody.getStreamingId(), connectionContext, ctx.channel());
ctx.writeAndFlush(response);
@@ -216,7 +216,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
- String database = backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+ String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
backendHandler.stopStreaming(connectionContext.getJobId());
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
@@ -226,7 +226,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
- String database = backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+ String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
try {
backendHandler.dropStreaming(connectionContext.getJobId());
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingListStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingListStatementAssert.java
new file mode 100644
index 00000000000..6e3cf925837
--- /dev/null
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingListStatementAssert.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc;
+
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Show streaming list statement assert.
+ */
+public final class ShowStreamingListStatementAssert {
+
+ /**
+ * Assert show streaming list statement is correct with expected parser result.
+ *
+ * @param assertContext assert context
+ * @param actual actual show streaming list statement
+ * @param expected expected show streaming list statement test case
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext, final ShowStreamingListStatement actual, final ShowStreamingListStatementTestCase expected) {
+ if (null == expected) {
+ assertNull(assertContext.getText("Actual statement should not exist."), actual);
+ } else {
+ assertNotNull(assertContext.getText("Actual statement should exist."), actual);
+ }
+ }
+}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingStatusStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingStatusStatementAssert.java
new file mode 100644
index 00000000000..78f300cf7c1
--- /dev/null
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/cdc/ShowStreamingStatusStatementAssert.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc;
+
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.JobIdAssert;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Show streaming status statement assert.
+ */
+public final class ShowStreamingStatusStatementAssert {
+
+ /**
+ * Assert show streaming status statement is correct with expected parser result.
+ *
+ * @param assertContext assert context
+ * @param actual actual show streaming status statement
+ * @param expected expected show streaming status statement test case
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext, final ShowStreamingStatusStatement actual, final ShowStreamingStatusStatementTestCase expected) {
+ if (null == expected) {
+ assertNull(assertContext.getText("Actual statement should not exist."), actual);
+ } else {
+ assertNotNull(assertContext.getText("Actual statement should exist."), actual);
+ JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
+ }
+ }
+}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java
index 5cc5574f2cf..924d98f4727 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/migration/QueryableScalingRALStatementAssert.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
@@ -26,6 +28,8 @@ import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListSt
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc.ShowStreamingListStatementAssert;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.cdc.ShowStreamingStatusStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationCheckAlgorithmsStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationCheckStatusStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationListStatementAssert;
@@ -33,6 +37,8 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.migration.query.ShowMigrationStatusStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowMigrationListStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckAlgorithmsStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckStatusStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationSourceStorageUnitsStatementTestCase;
@@ -63,6 +69,10 @@ public final class QueryableScalingRALStatementAssert {
ShowMigrationStatusStatementAssert.assertIs(assertContext, (ShowMigrationStatusStatement) actual, (ShowMigrationStatusStatementTestCase) expected);
} else if (actual instanceof ShowMigrationSourceStorageUnitsStatement) {
ShowMigrationSourceStorageUnitsStatementAssert.assertIs(assertContext, (ShowMigrationSourceStorageUnitsStatement) actual, (ShowMigrationSourceStorageUnitsStatementTestCase) expected);
+ } else if (actual instanceof ShowStreamingListStatement) {
+ ShowStreamingListStatementAssert.assertIs(assertContext, (ShowStreamingListStatement) actual, (ShowStreamingListStatementTestCase) expected);
+ } else if (actual instanceof ShowStreamingStatusStatement) {
+ ShowStreamingStatusStatementAssert.assertIs(assertContext, (ShowStreamingStatusStatement) actual, (ShowStreamingStatusStatementTestCase) expected);
}
}
}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 1a499278bd2..da246c497e8 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -316,6 +316,8 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTrafficRulesStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTransactionRuleStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.DropMigrationCheckStatementTestCase;
@@ -1021,6 +1023,12 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "migrate-table")
private final List<MigrateTableStatementTestCase> migrateTableTestCases = new LinkedList<>();
+ @XmlElement(name = "show-streaming-list")
+ private final List<ShowStreamingListStatementTestCase> showStreamingListTestCases = new LinkedList<>();
+
+ @XmlElement(name = "show-streaming-status")
+ private final List<ShowStreamingStatusStatementTestCase> showStreamingStatusTestCases = new LinkedList<>();
+
@XmlElement(name = "preview-sql")
private final List<PreviewStatementTestCase> previewTestCases = new LinkedList<>();
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingListStatementTestCase.java
similarity index 69%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingListStatementTestCase.java
index 05ab6eb54c8..7910837f867 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingListStatementTestCase.java
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
/**
- * Table based pipeline job info.
+ * Show streaming list statement test case.
*/
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String table;
+public final class ShowStreamingListStatementTestCase extends SQLParserTestCase {
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingStatusStatementTestCase.java
similarity index 64%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
copy to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingStatusStatementTestCase.java
index 05ab6eb54c8..1da83f5dec2 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingStatusStatementTestCase.java
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+
+import javax.xml.bind.annotation.XmlElement;
/**
- * Table based pipeline job info.
+ * Show streaming status statement test case.
*/
-@RequiredArgsConstructor
@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
+@Setter
+public final class ShowStreamingStatusStatementTestCase extends SQLParserTestCase {
- private final String table;
+ @XmlElement(name = "job-id")
+ private String jobId;
}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml b/test/it/parser/src/main/resources/case/ral/cdc.xml
new file mode 100644
index 00000000000..c0b8e7620f5
--- /dev/null
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<sql-parser-test-cases>
+ <show-streaming-list sql-case-id="show-streaming-list" />
+
+ <show-streaming-status sql-case-id="show-streaming-status">
+ <job-id>123</job-id>
+ </show-streaming-status>
+</sql-parser-test-cases>
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
new file mode 100644
index 00000000000..31af22ca298
--- /dev/null
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<sql-cases>
+ <sql-case id="show-streaming-list" value="SHOW STREAMING LIST;" db-types="ShardingSphere"/>
+ <sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;" db-types="ShardingSphere"/>
+</sql-cases>