You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/06/14 03:51:21 UTC

[shardingsphere] branch master updated: Refactor commit rollback streaming to drop streaming (#26333)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 74179366841 Refactor commit rollback streaming to drop streaming (#26333)
74179366841 is described below

commit 74179366841bc6a9f9613710d64cca43d0c2c8d6
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Jun 14 11:51:13 2023 +0800

    Refactor commit rollback streaming to drop streaming (#26333)
    
    * Remove rollback streaming
    
    * Rename commit streaming to drop streaming
---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  9 +++--
 .../pipeline/cdc/handler/CDCBackendHandler.java    | 17 ++------
 .../src/main/proto/CDCRequestProtocol.proto        | 12 ++----
 ...amingUpdater.java => DropStreamingUpdater.java} | 12 +++---
 .../handler/update/RollbackStreamingUpdater.java   | 42 --------------------
 ...ingsphere.distsql.handler.ral.update.RALUpdater |  3 +-
 .../distsql/parser/autogen/CDCDistSQLStatement.g4  |  3 +-
 .../parser/src/main/antlr4/imports/cdc/Keyword.g4  |  8 +---
 .../src/main/antlr4/imports/cdc/RALStatement.g4    |  8 +---
 .../parser/core/CDCDistSQLStatementVisitor.java    | 15 ++-----
 ...gStatement.java => DropStreamingStatement.java} |  4 +-
 .../statement/RollbackStreamingStatement.java      | 32 ---------------
 .../frontend/netty/CDCChannelInboundHandler.java   | 30 +++-----------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  2 +-
 .../UpdatablePipelineRALStatementAssert.java       | 15 +++----
 ...sert.java => DropStreamingStatementAssert.java} | 12 +++---
 .../cdc/RollbackStreamingStatementAssert.java      | 46 ----------------------
 .../cases/parser/jaxb/RootSQLParserTestCases.java  | 10 ++---
 ...se.java => DropStreamingStatementTestCase.java} |  4 +-
 .../cdc/RollbackStreamingStatementTestCase.java    | 35 ----------------
 test/it/parser/src/main/resources/case/ral/cdc.xml | 10 ++---
 .../src/main/resources/sql/supported/ral/cdc.xml   |  3 +-
 22 files changed, 57 insertions(+), 275 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 60306b5bc85..c3b11e70d2f 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -350,10 +350,14 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public void commit(final String jobId) {
-        stopAndDropJob(jobId);
     }
     
-    private void stopAndDropJob(final String jobId) {
+    /**
+     * Stop and drop job.
+     *
+     * @param jobId job id
+     */
+    public void stopAndDrop(final String jobId) {
         CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
         if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
             PipelineJobCenter.stop(jobId);
@@ -365,7 +369,6 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public void rollback(final String jobId) throws SQLException {
-        stopAndDropJob(jobId);
     }
     
     @Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 69a58617e37..d13987dd3c0 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -53,7 +53,6 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.single.rule.SingleRule;
 
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -177,22 +176,12 @@ public final class CDCBackendHandler {
     }
     
     /**
-     * Rollback streaming.
+     * Drop streaming.
      *
      * @param jobId job ID
-     * @throws SQLException SQL exception
      */
-    public void rollbackStreaming(final String jobId) throws SQLException {
-        jobAPI.rollback(jobId);
-    }
-    
-    /**
-     * Commit streaming.
-     *
-     * @param jobId job ID
-     */
-    public void commitStreaming(final String jobId) {
-        jobAPI.commit(jobId);
+    public void dropStreaming(final String jobId) {
+        jobAPI.stopAndDrop(jobId);
     }
     
     /**
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 1ec532fa64e..8e322293cfa 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -31,8 +31,7 @@ message CDCRequest {
     ACK_STREAMING = 3;
     STOP_STREAMING = 4;
     START_STREAMING = 5;
-    ROLLBACK_STREAMING = 6;
-    COMMIT_STREAMING = 7;
+    DROP_STREAMING = 6;
   }
   Type type = 3;
   oneof request_body {
@@ -41,8 +40,7 @@ message CDCRequest {
     AckStreamingRequestBody ack_streaming_request_body = 6;
     StopStreamingRequestBody stop_streaming_request_body = 7;
     StartStreamingRequestBody start_streaming_request_body = 8;
-    RollbackStreamingRequestBody rollback_streaming_request_body = 9;
-    CommitStreamingRequestBody commit_streaming_request_body = 10;
+    DropStreamingRequestBody drop_streaming_request_body = 9;
   }
 }
 
@@ -84,10 +82,6 @@ message StartStreamingRequestBody {
   string streaming_id = 1;
 }
 
-message RollbackStreamingRequestBody {
-  string streaming_id = 1;
-}
-
-message CommitStreamingRequestBody {
+message DropStreamingRequestBody {
   string streaming_id = 1;
 }
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
similarity index 73%
rename from kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
rename to kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
index ec8e7eddcc9..92b76b90ee1 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
@@ -17,26 +17,26 @@
 
 package org.apache.shardingsphere.cdc.distsql.handler.update;
 
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 
 import java.sql.SQLException;
 
 /**
- * Commit streaming updater.
+ * Drop streaming updater.
  */
-public final class CommitStreamingUpdater implements RALUpdater<CommitStreamingStatement> {
+public final class DropStreamingUpdater implements RALUpdater<DropStreamingStatement> {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
     @Override
-    public void executeUpdate(final String databaseName, final CommitStreamingStatement sqlStatement) throws SQLException {
-        jobAPI.commit(sqlStatement.getJobId());
+    public void executeUpdate(final String databaseName, final DropStreamingStatement sqlStatement) throws SQLException {
+        jobAPI.stopAndDrop(sqlStatement.getJobId());
     }
     
     @Override
     public String getType() {
-        return CommitStreamingStatement.class.getName();
+        return DropStreamingStatement.class.getName();
     }
 }
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
deleted file mode 100644
index a6a2ef579d4..00000000000
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.cdc.distsql.handler.update;
-
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
-import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
-
-import java.sql.SQLException;
-
-/**
- * Rollback streaming updater.
- */
-public final class RollbackStreamingUpdater implements RALUpdater<RollbackStreamingStatement> {
-    
-    private final CDCJobAPI jobAPI = new CDCJobAPI();
-    
-    @Override
-    public void executeUpdate(final String databaseName, final RollbackStreamingStatement sqlStatement) throws SQLException {
-        jobAPI.rollback(sqlStatement.getJobId());
-    }
-    
-    @Override
-    public String getType() {
-        return RollbackStreamingStatement.class.getName();
-    }
-}
diff --git a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
index 63de7ed6cac..0cb7002d6ab 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
+++ b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
@@ -26,5 +26,4 @@ org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpda
 org.apache.shardingsphere.migration.distsql.handler.update.StartMigrationCheckUpdater
 org.apache.shardingsphere.migration.distsql.handler.update.StopMigrationCheckUpdater
 org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationCheckUpdater
-org.apache.shardingsphere.cdc.distsql.handler.update.RollbackStreamingUpdater
-org.apache.shardingsphere.cdc.distsql.handler.update.CommitStreamingUpdater
+org.apache.shardingsphere.cdc.distsql.handler.update.DropStreamingUpdater
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index ed0637e4c0b..0395586766a 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -22,7 +22,6 @@ import Symbol, RALStatement;
 execute
     : (showStreamingList
     | showStreamingStatus
-    | rollbackStreaming
-    | commitStreaming
+    | dropStreaming
     ) SEMI_? EOF
     ;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index e5b28e62df8..9eb03da7e4d 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -39,10 +39,6 @@ STATUS
     : S T A T U S
     ;
 
-ROLLBACK
-    : R O L L B A C K
-    ;
-
-COMMIT
-    : C O M M I T
+DROP
+    : D R O P
     ;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 5f591718437..03ca7573ace 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -27,12 +27,8 @@ showStreamingStatus
     : SHOW STREAMING STATUS jobId
     ;
 
-rollbackStreaming
-    : ROLLBACK STREAMING jobId
-    ;
-
-commitStreaming
-    : COMMIT STREAMING jobId
+dropStreaming
+    : DROP STREAMING jobId
     ;
 
 jobId
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index 31f93c3f279..37db49fd40c 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -18,13 +18,11 @@
 package org.apache.shardingsphere.cdc.distsql.parser.core;
 
 import org.antlr.v4.runtime.tree.ParseTree;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
 import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
-import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.CommitStreamingContext;
-import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RollbackStreamingContext;
+import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
 import org.apache.shardingsphere.sql.parser.api.ASTNode;
@@ -47,13 +45,8 @@ public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVis
     }
     
     @Override
-    public ASTNode visitRollbackStreaming(final RollbackStreamingContext ctx) {
-        return new RollbackStreamingStatement(getIdentifierValue(ctx.jobId()));
-    }
-    
-    @Override
-    public ASTNode visitCommitStreaming(final CommitStreamingContext ctx) {
-        return new CommitStreamingStatement(getIdentifierValue(ctx.jobId()));
+    public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
+        return new DropStreamingStatement(getIdentifierValue(ctx.jobId()));
     }
     
     private String getIdentifierValue(final ParseTree ctx) {
diff --git a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/DropStreamingStatement.java
similarity index 90%
rename from kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
rename to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/DropStreamingStatement.java
index 212668c3354..a60e79d4d1e 100644
--- a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/DropStreamingStatement.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
 
 /**
- * Commit streaming statement.
+ * Drop streaming statement.
  */
 @RequiredArgsConstructor
 @Getter
-public final class CommitStreamingStatement extends UpdatableCDCRALStatement {
+public final class DropStreamingStatement extends UpdatableCDCRALStatement {
     
     private final String jobId;
 }
diff --git a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
deleted file mode 100644
index 2879748b978..00000000000
--- a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.cdc.distsql.statement;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
-
-/**
- * Rollback streaming statement.
- */
-@RequiredArgsConstructor
-@Getter
-public final class RollbackStreamingStatement extends UpdatableCDCRALStatement {
-    
-    private final String jobId;
-}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index d28a31f7ed8..390bfa5736e 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -29,14 +29,12 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
-import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CommitStreamingRequestBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.RollbackStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
@@ -55,7 +53,6 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.sql.SQLException;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -124,11 +121,8 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
             case START_STREAMING:
                 processStartStreamingRequest(ctx, request, connectionContext);
                 break;
-            case ROLLBACK_STREAMING:
-                processRollbackStreamingRequest(ctx, request, connectionContext);
-                break;
-            case COMMIT_STREAMING:
-                processCommitStreamingRequest(ctx, request, connectionContext);
+            case DROP_STREAMING:
+                processDropStreamingRequest(ctx, request, connectionContext);
                 break;
             default:
                 log.warn("can't handle this type of request {}", request);
@@ -215,22 +209,10 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
         ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
-    private void processRollbackStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
-        RollbackStreamingRequestBody requestBody = request.getRollbackStreamingRequestBody();
-        checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
-        try {
-            backendHandler.rollbackStreaming(connectionContext.getJobId());
-            connectionContext.setJobId(null);
-            ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
-        } catch (final SQLException ex) {
-            throw new CDCExceptionWrapper(request.getRequestId(), new CDCServerException(ex.getMessage()));
-        }
-    }
-    
-    private void processCommitStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
-        CommitStreamingRequestBody requestBody = request.getCommitStreamingRequestBody();
+    private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
-        backendHandler.commitStreaming(connectionContext.getJobId());
+        backendHandler.dropStreaming(connectionContext.getJobId());
         connectionContext.setJobId(null);
         ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index b3cfafdfbdc..08ea9eca513 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -148,7 +148,7 @@ class CDCE2EIT {
                     tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
             DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
             assertTrue(checkResult.isMatched());
-            containerComposer.proxyExecuteWithLog(String.format("COMMIT STREAMING '%s'", jobId), 0);
+            containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0);
             assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
         }
     }
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
index 7109e877017..b7ddd0b1560 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
@@ -19,8 +19,7 @@ package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.UpdatablePipelineRALStatement;
 import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
 import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
@@ -33,8 +32,7 @@ import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckS
 import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
 import org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.CommitStreamingStatementAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.RollbackStreamingStatementAssert;
+import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.DropStreamingStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CheckMigrationStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CommitMigrationStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.MigrateTableStatementAssert;
@@ -46,8 +44,7 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.MigrateTableStatementTestCase;
@@ -96,10 +93,8 @@ public final class UpdatablePipelineRALStatementAssert {
             StartMigrationCheckStatementAssert.assertIs(assertContext, (StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase) expected);
         } else if (actual instanceof StopMigrationCheckStatement) {
             StopMigrationCheckStatementAssert.assertIs(assertContext, (StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase) expected);
-        } else if (actual instanceof RollbackStreamingStatement) {
-            RollbackStreamingStatementAssert.assertIs(assertContext, (RollbackStreamingStatement) actual, (RollbackStreamingStatementTestCase) expected);
-        } else if (actual instanceof CommitStreamingStatement) {
-            CommitStreamingStatementAssert.assertIs(assertContext, (CommitStreamingStatement) actual, (CommitStreamingStatementTestCase) expected);
+        } else if (actual instanceof DropStreamingStatement) {
+            DropStreamingStatementAssert.assertIs(assertContext, (DropStreamingStatement) actual, (DropStreamingStatementTestCase) expected);
         }
     }
 }
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/DropStreamingStatementAssert.java
similarity index 80%
rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/DropStreamingStatementAssert.java
index 00d4ee6f594..a6f680081e0 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/DropStreamingStatementAssert.java
@@ -19,26 +19,26 @@ package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
 import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
 
 /**
- * Commit streaming statement assert.
+ * Drop streaming statement assert.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CommitStreamingStatementAssert {
+public final class DropStreamingStatementAssert {
     
     /**
-     * Assert commit streaming statement is correct with expected parser result.
+     * Assert drop streaming statement is correct with expected parser result.
      *
      * @param assertContext assert context
      * @param actual        actual
      * @param expected      expected
      */
-    public static void assertIs(final SQLCaseAssertContext assertContext, final CommitStreamingStatement actual, final CommitStreamingStatementTestCase expected) {
+    public static void assertIs(final SQLCaseAssertContext assertContext, final DropStreamingStatement actual, final DropStreamingStatementTestCase expected) {
         if (ExistingAssert.assertIs(assertContext, actual, expected)) {
             JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
         }
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
deleted file mode 100644
index 7d964908698..00000000000
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
-
-/**
- * Rollback streaming statement assert.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RollbackStreamingStatementAssert {
-    
-    /**
-     * Assert rollback streaming statement is correct with expected parser result.
-     *
-     * @param assertContext assert context
-     * @param actual        actual
-     * @param expected      expected
-     */
-    public static void assertIs(final SQLCaseAssertContext assertContext, final RollbackStreamingStatement actual, final RollbackStreamingStatementTestCase expected) {
-        if (ExistingAssert.assertIs(assertContext, actual, expected)) {
-            JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
-        }
-    }
-}
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 18468059442..ccb43599023 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -321,8 +321,7 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTrafficRulesStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTransactionRuleStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
+import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
 import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
@@ -1019,11 +1018,8 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "show-streaming-status")
     private final List<ShowStreamingStatusStatementTestCase> showStreamingStatusTestCases = new LinkedList<>();
     
-    @XmlElement(name = "rollback-streaming")
-    private final List<RollbackStreamingStatementTestCase> rollbackStreamingTestCases = new LinkedList<>();
-    
-    @XmlElement(name = "commit-streaming")
-    private final List<CommitStreamingStatementTestCase> commitStreamingTestCases = new LinkedList<>();
+    @XmlElement(name = "drop-streaming")
+    private final List<DropStreamingStatementTestCase> dropStreamingTestCases = new LinkedList<>();
     
     @XmlElement(name = "preview-sql")
     private final List<PreviewStatementTestCase> previewTestCases = new LinkedList<>();
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/DropStreamingStatementTestCase.java
similarity index 90%
rename from test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
rename to test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/DropStreamingStatementTestCase.java
index 695cfbd2198..192bb5461bf 100644
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
+++ b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/DropStreamingStatementTestCase.java
@@ -24,11 +24,11 @@ import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.S
 import javax.xml.bind.annotation.XmlElement;
 
 /**
- * Commit streaming statement test case.
+ * Drop streaming statement test case.
  */
 @Getter
 @Setter
-public final class CommitStreamingStatementTestCase extends SQLParserTestCase {
+public final class DropStreamingStatementTestCase extends SQLParserTestCase {
     
     @XmlElement(name = "job-id")
     private String jobId;
diff --git a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
deleted file mode 100644
index b4c20895934..00000000000
--- a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-
-import javax.xml.bind.annotation.XmlElement;
-
-/**
- * Rollback streaming statement test case.
- */
-@Getter
-@Setter
-public final class RollbackStreamingStatementTestCase extends SQLParserTestCase {
-    
-    @XmlElement(name = "job-id")
-    private String jobId;
-}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml b/test/it/parser/src/main/resources/case/ral/cdc.xml
index 932f5818411..534077055b8 100644
--- a/test/it/parser/src/main/resources/case/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -22,12 +22,8 @@
     <show-streaming-status sql-case-id="show-streaming-status">
         <job-id>123</job-id>
     </show-streaming-status>
-
-    <rollback-streaming sql-case-id="rollback-streaming">
-        <job-id>123</job-id>
-    </rollback-streaming>
-
-    <commit-streaming sql-case-id="commit-streaming">
+    
+    <drop-streaming sql-case-id="drop-streaming">
         <job-id>123</job-id>
-    </commit-streaming>
+    </drop-streaming>
 </sql-parser-test-cases>
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
index 2190b38e51c..282f03b2629 100644
--- a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -19,6 +19,5 @@
 <sql-cases>
     <sql-case id="show-streaming-list" value="SHOW STREAMING LIST;" db-types="ShardingSphere"/>
     <sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;" db-types="ShardingSphere"/>
-    <sql-case id="rollback-streaming" value="ROLLBACK STREAMING 123;" db-types="ShardingSphere"/>
-    <sql-case id="commit-streaming" value="COMMIT STREAMING 123;" db-types="ShardingSphere"/>
+    <sql-case id="drop-streaming" value="DROP STREAMING 123;" db-types="ShardingSphere"/>
 </sql-cases>