You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/06/14 03:51:21 UTC
[shardingsphere] branch master updated: Refactor commit rollback streaming to drop streaming (#26333)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 74179366841 Refactor commit rollback streaming to drop streaming (#26333)
74179366841 is described below
commit 74179366841bc6a9f9613710d64cca43d0c2c8d6
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Jun 14 11:51:13 2023 +0800
Refactor commit rollback streaming to drop streaming (#26333)
* Remove rollback streaming
* Rename commit streaming to drop streaming
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 9 +++--
.../pipeline/cdc/handler/CDCBackendHandler.java | 17 ++------
.../src/main/proto/CDCRequestProtocol.proto | 12 ++----
...amingUpdater.java => DropStreamingUpdater.java} | 12 +++---
.../handler/update/RollbackStreamingUpdater.java | 42 --------------------
...ingsphere.distsql.handler.ral.update.RALUpdater | 3 +-
.../distsql/parser/autogen/CDCDistSQLStatement.g4 | 3 +-
.../parser/src/main/antlr4/imports/cdc/Keyword.g4 | 8 +---
.../src/main/antlr4/imports/cdc/RALStatement.g4 | 8 +---
.../parser/core/CDCDistSQLStatementVisitor.java | 15 ++-----
...gStatement.java => DropStreamingStatement.java} | 4 +-
.../statement/RollbackStreamingStatement.java | 32 ---------------
.../frontend/netty/CDCChannelInboundHandler.java | 30 +++-----------
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +-
.../UpdatablePipelineRALStatementAssert.java | 15 +++----
...sert.java => DropStreamingStatementAssert.java} | 12 +++---
.../cdc/RollbackStreamingStatementAssert.java | 46 ----------------------
.../cases/parser/jaxb/RootSQLParserTestCases.java | 10 ++---
...se.java => DropStreamingStatementTestCase.java} | 4 +-
.../cdc/RollbackStreamingStatementTestCase.java | 35 ----------------
test/it/parser/src/main/resources/case/ral/cdc.xml | 10 ++---
.../src/main/resources/sql/supported/ral/cdc.xml | 3 +-
22 files changed, 57 insertions(+), 275 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 60306b5bc85..c3b11e70d2f 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -350,10 +350,14 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
public void commit(final String jobId) {
- stopAndDropJob(jobId);
}
- private void stopAndDropJob(final String jobId) {
+ /**
+ * Stop and drop job.
+ *
+ * @param jobId job id
+ */
+ public void stopAndDrop(final String jobId) {
CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
PipelineJobCenter.stop(jobId);
@@ -365,7 +369,6 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
public void rollback(final String jobId) throws SQLException {
- stopAndDropJob(jobId);
}
@Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 69a58617e37..d13987dd3c0 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -53,7 +53,6 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.single.rule.SingleRule;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -177,22 +176,12 @@ public final class CDCBackendHandler {
}
/**
- * Rollback streaming.
+ * Drop streaming.
*
* @param jobId job ID
- * @throws SQLException SQL exception
*/
- public void rollbackStreaming(final String jobId) throws SQLException {
- jobAPI.rollback(jobId);
- }
-
- /**
- * Commit streaming.
- *
- * @param jobId job ID
- */
- public void commitStreaming(final String jobId) {
- jobAPI.commit(jobId);
+ public void dropStreaming(final String jobId) {
+ jobAPI.stopAndDrop(jobId);
}
/**
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 1ec532fa64e..8e322293cfa 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -31,8 +31,7 @@ message CDCRequest {
ACK_STREAMING = 3;
STOP_STREAMING = 4;
START_STREAMING = 5;
- ROLLBACK_STREAMING = 6;
- COMMIT_STREAMING = 7;
+ DROP_STREAMING = 6;
}
Type type = 3;
oneof request_body {
@@ -41,8 +40,7 @@ message CDCRequest {
AckStreamingRequestBody ack_streaming_request_body = 6;
StopStreamingRequestBody stop_streaming_request_body = 7;
StartStreamingRequestBody start_streaming_request_body = 8;
- RollbackStreamingRequestBody rollback_streaming_request_body = 9;
- CommitStreamingRequestBody commit_streaming_request_body = 10;
+ DropStreamingRequestBody drop_streaming_request_body = 9;
}
}
@@ -84,10 +82,6 @@ message StartStreamingRequestBody {
string streaming_id = 1;
}
-message RollbackStreamingRequestBody {
- string streaming_id = 1;
-}
-
-message CommitStreamingRequestBody {
+message DropStreamingRequestBody {
string streaming_id = 1;
}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
similarity index 73%
rename from kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
rename to kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
index ec8e7eddcc9..92b76b90ee1 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
@@ -17,26 +17,26 @@
package org.apache.shardingsphere.cdc.distsql.handler.update;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import java.sql.SQLException;
/**
- * Commit streaming updater.
+ * Drop streaming updater.
*/
-public final class CommitStreamingUpdater implements RALUpdater<CommitStreamingStatement> {
+public final class DropStreamingUpdater implements RALUpdater<DropStreamingStatement> {
private final CDCJobAPI jobAPI = new CDCJobAPI();
@Override
- public void executeUpdate(final String databaseName, final CommitStreamingStatement sqlStatement) throws SQLException {
- jobAPI.commit(sqlStatement.getJobId());
+ public void executeUpdate(final String databaseName, final DropStreamingStatement sqlStatement) throws SQLException {
+ jobAPI.stopAndDrop(sqlStatement.getJobId());
}
@Override
public String getType() {
- return CommitStreamingStatement.class.getName();
+ return DropStreamingStatement.class.getName();
}
}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
deleted file mode 100644
index a6a2ef579d4..00000000000
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.cdc.distsql.handler.update;
-
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
-import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
-
-import java.sql.SQLException;
-
-/**
- * Rollback streaming updater.
- */
-public final class RollbackStreamingUpdater implements RALUpdater<RollbackStreamingStatement> {
-
- private final CDCJobAPI jobAPI = new CDCJobAPI();
-
- @Override
- public void executeUpdate(final String databaseName, final RollbackStreamingStatement sqlStatement) throws SQLException {
- jobAPI.rollback(sqlStatement.getJobId());
- }
-
- @Override
- public String getType() {
- return RollbackStreamingStatement.class.getName();
- }
-}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
index 63de7ed6cac..0cb7002d6ab 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
+++ b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
@@ -26,5 +26,4 @@ org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpda
org.apache.shardingsphere.migration.distsql.handler.update.StartMigrationCheckUpdater
org.apache.shardingsphere.migration.distsql.handler.update.StopMigrationCheckUpdater
org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationCheckUpdater
-org.apache.shardingsphere.cdc.distsql.handler.update.RollbackStreamingUpdater
-org.apache.shardingsphere.cdc.distsql.handler.update.CommitStreamingUpdater
+org.apache.shardingsphere.cdc.distsql.handler.update.DropStreamingUpdater
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index ed0637e4c0b..0395586766a 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -22,7 +22,6 @@ import Symbol, RALStatement;
execute
: (showStreamingList
| showStreamingStatus
- | rollbackStreaming
- | commitStreaming
+ | dropStreaming
) SEMI_? EOF
;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index e5b28e62df8..9eb03da7e4d 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -39,10 +39,6 @@ STATUS
: S T A T U S
;
-ROLLBACK
- : R O L L B A C K
- ;
-
-COMMIT
- : C O M M I T
+DROP
+ : D R O P
;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 5f591718437..03ca7573ace 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -27,12 +27,8 @@ showStreamingStatus
: SHOW STREAMING STATUS jobId
;
-rollbackStreaming
- : ROLLBACK STREAMING jobId
- ;
-
-commitStreaming
- : COMMIT STREAMING jobId
+dropStreaming
+ : DROP STREAMING jobId
;
jobId
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index 31f93c3f279..37db49fd40c 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -18,13 +18,11 @@
package org.apache.shardingsphere.cdc.distsql.parser.core;
import org.antlr.v4.runtime.tree.ParseTree;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
-import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.CommitStreamingContext;
-import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RollbackStreamingContext;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
import org.apache.shardingsphere.sql.parser.api.ASTNode;
@@ -47,13 +45,8 @@ public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVis
}
@Override
- public ASTNode visitRollbackStreaming(final RollbackStreamingContext ctx) {
- return new RollbackStreamingStatement(getIdentifierValue(ctx.jobId()));
- }
-
- @Override
- public ASTNode visitCommitStreaming(final CommitStreamingContext ctx) {
- return new CommitStreamingStatement(getIdentifierValue(ctx.jobId()));
+ public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
+ return new DropStreamingStatement(getIdentifierValue(ctx.jobId()));
}
private String getIdentifierValue(final ParseTree ctx) {
diff --git a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/DropStreamingStatement.java
similarity index 90%
rename from kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
rename to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/DropStreamingStatement.java
index 212668c3354..a60e79d4d1e 100644
--- a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/DropStreamingStatement.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
/**
- * Commit streaming statement.
+ * Drop streaming statement.
*/
@RequiredArgsConstructor
@Getter
-public final class CommitStreamingStatement extends UpdatableCDCRALStatement {
+public final class DropStreamingStatement extends UpdatableCDCRALStatement {
private final String jobId;
}
diff --git a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
deleted file mode 100644
index 2879748b978..00000000000
--- a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.cdc.distsql.statement;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
-
-/**
- * Rollback streaming statement.
- */
-@RequiredArgsConstructor
-@Getter
-public final class RollbackStreamingStatement extends UpdatableCDCRALStatement {
-
- private final String jobId;
-}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index d28a31f7ed8..390bfa5736e 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -29,14 +29,12 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
-import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CommitStreamingRequestBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.RollbackStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
@@ -55,7 +53,6 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.sql.SQLException;
import java.util.Objects;
import java.util.Optional;
@@ -124,11 +121,8 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
case START_STREAMING:
processStartStreamingRequest(ctx, request, connectionContext);
break;
- case ROLLBACK_STREAMING:
- processRollbackStreamingRequest(ctx, request, connectionContext);
- break;
- case COMMIT_STREAMING:
- processCommitStreamingRequest(ctx, request, connectionContext);
+ case DROP_STREAMING:
+ processDropStreamingRequest(ctx, request, connectionContext);
break;
default:
log.warn("can't handle this type of request {}", request);
@@ -215,22 +209,10 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
- private void processRollbackStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
- RollbackStreamingRequestBody requestBody = request.getRollbackStreamingRequestBody();
- checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
- try {
- backendHandler.rollbackStreaming(connectionContext.getJobId());
- connectionContext.setJobId(null);
- ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
- } catch (final SQLException ex) {
- throw new CDCExceptionWrapper(request.getRequestId(), new CDCServerException(ex.getMessage()));
- }
- }
-
- private void processCommitStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
- CommitStreamingRequestBody requestBody = request.getCommitStreamingRequestBody();
+ private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+ DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
- backendHandler.commitStreaming(connectionContext.getJobId());
+ backendHandler.dropStreaming(connectionContext.getJobId());
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index b3cfafdfbdc..08ea9eca513 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -148,7 +148,7 @@ class CDCE2EIT {
tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
assertTrue(checkResult.isMatched());
- containerComposer.proxyExecuteWithLog(String.format("COMMIT STREAMING '%s'", jobId), 0);
+ containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0);
assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
}
}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
index 7109e877017..b7ddd0b1560 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
@@ -19,8 +19,7 @@ package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.UpdatablePipelineRALStatement;
import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
@@ -33,8 +32,7 @@ import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckS
import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.CommitStreamingStatementAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.RollbackStreamingStatementAssert;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.DropStreamingStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CheckMigrationStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CommitMigrationStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.MigrateTableStatementAssert;
@@ -46,8 +44,7 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.MigrateTableStatementTestCase;
@@ -96,10 +93,8 @@ public final class UpdatablePipelineRALStatementAssert {
StartMigrationCheckStatementAssert.assertIs(assertContext, (StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase) expected);
} else if (actual instanceof StopMigrationCheckStatement) {
StopMigrationCheckStatementAssert.assertIs(assertContext, (StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase) expected);
- } else if (actual instanceof RollbackStreamingStatement) {
- RollbackStreamingStatementAssert.assertIs(assertContext, (RollbackStreamingStatement) actual, (RollbackStreamingStatementTestCase) expected);
- } else if (actual instanceof CommitStreamingStatement) {
- CommitStreamingStatementAssert.assertIs(assertContext, (CommitStreamingStatement) actual, (CommitStreamingStatementTestCase) expected);
+ } else if (actual instanceof DropStreamingStatement) {
+ DropStreamingStatementAssert.assertIs(assertContext, (DropStreamingStatement) actual, (DropStreamingStatementTestCase) expected);
}
}
}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/DropStreamingStatementAssert.java
similarity index 80%
rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/DropStreamingStatementAssert.java
index 00d4ee6f594..a6f680081e0 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/DropStreamingStatementAssert.java
@@ -19,26 +19,26 @@ package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
/**
- * Commit streaming statement assert.
+ * Drop streaming statement assert.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CommitStreamingStatementAssert {
+public final class DropStreamingStatementAssert {
/**
- * Assert commit streaming statement is correct with expected parser result.
+ * Assert drop streaming statement is correct with expected parser result.
*
* @param assertContext assert context
* @param actual actual
* @param expected expected
*/
- public static void assertIs(final SQLCaseAssertContext assertContext, final CommitStreamingStatement actual, final CommitStreamingStatementTestCase expected) {
+ public static void assertIs(final SQLCaseAssertContext assertContext, final DropStreamingStatement actual, final DropStreamingStatementTestCase expected) {
if (ExistingAssert.assertIs(assertContext, actual, expected)) {
JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
deleted file mode 100644
index 7d964908698..00000000000
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
-
-/**
- * Rollback streaming statement assert.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RollbackStreamingStatementAssert {
-
- /**
- * Assert rollback streaming statement is correct with expected parser result.
- *
- * @param assertContext assert context
- * @param actual actual
- * @param expected expected
- */
- public static void assertIs(final SQLCaseAssertContext assertContext, final RollbackStreamingStatement actual, final RollbackStreamingStatementTestCase expected) {
- if (ExistingAssert.assertIs(assertContext, actual, expected)) {
- JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
- }
- }
-}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 18468059442..ccb43599023 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -321,8 +321,7 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTrafficRulesStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTransactionRuleStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
@@ -1019,11 +1018,8 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "show-streaming-status")
private final List<ShowStreamingStatusStatementTestCase> showStreamingStatusTestCases = new LinkedList<>();
- @XmlElement(name = "rollback-streaming")
- private final List<RollbackStreamingStatementTestCase> rollbackStreamingTestCases = new LinkedList<>();
-
- @XmlElement(name = "commit-streaming")
- private final List<CommitStreamingStatementTestCase> commitStreamingTestCases = new LinkedList<>();
+ @XmlElement(name = "drop-streaming")
+ private final List<DropStreamingStatementTestCase> dropStreamingTestCases = new LinkedList<>();
@XmlElement(name = "preview-sql")
private final List<PreviewStatementTestCase> previewTestCases = new LinkedList<>();
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/DropStreamingStatementTestCase.java
similarity index 90%
rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/DropStreamingStatementTestCase.java
index 695cfbd2198..192bb5461bf 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/DropStreamingStatementTestCase.java
@@ -24,11 +24,11 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.S
import javax.xml.bind.annotation.XmlElement;
/**
- * Commit streaming statement test case.
+ * Drop streaming statement test case.
*/
@Getter
@Setter
-public final class CommitStreamingStatementTestCase extends SQLParserTestCase {
+public final class DropStreamingStatementTestCase extends SQLParserTestCase {
@XmlElement(name = "job-id")
private String jobId;
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
deleted file mode 100644
index b4c20895934..00000000000
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-
-import javax.xml.bind.annotation.XmlElement;
-
-/**
- * Rollback streaming statement test case.
- */
-@Getter
-@Setter
-public final class RollbackStreamingStatementTestCase extends SQLParserTestCase {
-
- @XmlElement(name = "job-id")
- private String jobId;
-}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml b/test/it/parser/src/main/resources/case/ral/cdc.xml
index 932f5818411..534077055b8 100644
--- a/test/it/parser/src/main/resources/case/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -22,12 +22,8 @@
<show-streaming-status sql-case-id="show-streaming-status">
<job-id>123</job-id>
</show-streaming-status>
-
- <rollback-streaming sql-case-id="rollback-streaming">
- <job-id>123</job-id>
- </rollback-streaming>
-
- <commit-streaming sql-case-id="commit-streaming">
+
+ <drop-streaming sql-case-id="drop-streaming">
<job-id>123</job-id>
- </commit-streaming>
+ </drop-streaming>
</sql-parser-test-cases>
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
index 2190b38e51c..282f03b2629 100644
--- a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -19,6 +19,5 @@
<sql-cases>
<sql-case id="show-streaming-list" value="SHOW STREAMING LIST;" db-types="ShardingSphere"/>
<sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;" db-types="ShardingSphere"/>
- <sql-case id="rollback-streaming" value="ROLLBACK STREAMING 123;" db-types="ShardingSphere"/>
- <sql-case id="commit-streaming" value="COMMIT STREAMING 123;" db-types="ShardingSphere"/>
+ <sql-case id="drop-streaming" value="DROP STREAMING 123;" db-types="ShardingSphere"/>
</sql-cases>