You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/04/10 10:41:01 UTC

[shardingsphere] branch master updated: Fix MySQL packet out of order when client sending pipelining requests (#25081)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 496b5dd965d Fix MySQL packet out of order when client sending pipelining requests (#25081)
496b5dd965d is described below

commit 496b5dd965dcf1ee8bf67172097554dc408082b0
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Mon Apr 10 18:40:52 2023 +0800

    Fix MySQL packet out of order when client sending pipelining requests (#25081)
    
    * Handle MySQL sequence ID after flow control
    
    * Add ProxyFlowControlHandler
    
    * Remove TODO in MySQLPacketCodecEngine
    
    * Add MySQLSequenceIDInboundHandlerTest
    
    * Revert MySQLAuthenticationEngine
    
    * Complete ServerHandlerInitializerTest
    
    * Add ProxyFlowControlHandlerTest
    
    * Complete MySQLPacketCodecEngineTest
    
    * Complete OKProxyStateTest
    
    * Read slice in MySQLSequenceIDInboundHandler
---
 db-protocol/core/pom.xml                           |  2 +-
 .../db/protocol/event/WriteCompleteEvent.java      | 35 +++++++++++
 .../db/protocol/netty/ProxyFlowControlHandler.java | 36 +++++++++++
 .../netty/ProxyFlowControlHandlerTest.java         | 47 +++++++++++++++
 .../mysql/codec/MySQLPacketCodecEngine.java        |  9 ++-
 .../mysql/netty/MySQLSequenceIDInboundHandler.java | 37 ++++++++++++
 .../mysql/codec/MySQLPacketCodecEngineTest.java    |  4 +-
 .../netty/MySQLSequenceIDInboundHandlerTest.java   | 69 ++++++++++++++++++++++
 .../pipeline/mysql/ingest/client/MySQLClient.java  |  2 +
 .../frontend/command/CommandExecutorTask.java      |  2 +
 .../frontend/netty/ServerHandlerInitializer.java   |  6 +-
 .../proxy/frontend/state/impl/OKProxyState.java    |  1 +
 .../netty/ServerHandlerInitializerTest.java        |  5 +-
 .../frontend/state/impl/OKProxyStateTest.java      | 23 ++++++--
 .../proxy/frontend/mysql/MySQLFrontendEngine.java  |  3 +
 15 files changed, 266 insertions(+), 15 deletions(-)

diff --git a/db-protocol/core/pom.xml b/db-protocol/core/pom.xml
index 12b493c9260..32dfd426701 100644
--- a/db-protocol/core/pom.xml
+++ b/db-protocol/core/pom.xml
@@ -36,7 +36,7 @@
         
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty-codec</artifactId>
+            <artifactId>netty-handler</artifactId>
         </dependency>
     </dependencies>
 </project>
diff --git a/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/event/WriteCompleteEvent.java b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/event/WriteCompleteEvent.java
new file mode 100644
index 00000000000..e8426d00d0e
--- /dev/null
+++ b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/event/WriteCompleteEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.event;
+
+/**
+ * Write complete event.
+ */
+public final class WriteCompleteEvent {
+    
+    private static final WriteCompleteEvent INSTANCE = new WriteCompleteEvent();
+    
+    /**
+     * Get instance of {@link WriteCompleteEvent}.
+     *
+     * @return instance of {@link WriteCompleteEvent}
+     */
+    public static WriteCompleteEvent getInstance() {
+        return INSTANCE;
+    }
+}
diff --git a/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandler.java b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandler.java
new file mode 100644
index 00000000000..c8c90fd31a2
--- /dev/null
+++ b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.flow.FlowControlHandler;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
+
+/**
+ * Flow control handler for ShardingSphere-Proxy.
+ */
+public final class ProxyFlowControlHandler extends FlowControlHandler {
+    
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+        if (WriteCompleteEvent.getInstance() == evt) {
+            ctx.channel().config().setAutoRead(true);
+        }
+        ctx.fireUserEventTriggered(evt);
+    }
+}
diff --git a/db-protocol/core/src/test/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandlerTest.java b/db-protocol/core/src/test/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandlerTest.java
new file mode 100644
index 00000000000..bb15481d1d4
--- /dev/null
+++ b/db-protocol/core/src/test/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandlerTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ProxyFlowControlHandlerTest {
+    
+    @Test
+    void assertUserEventTriggered() {
+        AtomicBoolean eventReceived = new AtomicBoolean(false);
+        EmbeddedChannel channel = new EmbeddedChannel(new ProxyFlowControlHandler(), new ChannelInboundHandlerAdapter() {
+            
+            @Override
+            public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+                eventReceived.set(WriteCompleteEvent.getInstance() == evt);
+            }
+        });
+        channel.config().setAutoRead(false);
+        channel.pipeline().fireUserEventTriggered(WriteCompleteEvent.getInstance());
+        assertTrue(channel.config().isAutoRead());
+        assertTrue(eventReceived.get());
+    }
+}
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
index 0dc9b2797fe..18f802355aa 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
@@ -61,11 +61,9 @@ public final class MySQLPacketCodecEngine implements DatabasePacketCodecEngine<M
             in.resetReaderIndex();
             return;
         }
-        short sequenceId = in.readUnsignedByte();
-        context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(sequenceId + 1);
-        ByteBuf message = in.readRetainedSlice(payloadLength);
+        ByteBuf message = in.readRetainedSlice(remainPayloadLength);
         if (MAX_PACKET_LENGTH == payloadLength) {
-            pendingMessages.add(message);
+            pendingMessages.add(message.skipBytes(SEQUENCE_LENGTH));
         } else if (pendingMessages.isEmpty()) {
             out.add(message);
         } else {
@@ -74,7 +72,8 @@ public final class MySQLPacketCodecEngine implements DatabasePacketCodecEngine<M
     }
     
     private void aggregateMessages(final ChannelHandlerContext context, final ByteBuf lastMessage, final List<Object> out) {
-        CompositeByteBuf result = context.alloc().compositeBuffer(pendingMessages.size() + 1);
+        CompositeByteBuf result = context.alloc().compositeBuffer(SEQUENCE_LENGTH + pendingMessages.size() + 1);
+        result.addComponent(true, lastMessage.readSlice(SEQUENCE_LENGTH));
         Iterator<ByteBuf> pendingMessagesIterator = pendingMessages.iterator();
         result.addComponent(true, pendingMessagesIterator.next());
         while (pendingMessagesIterator.hasNext()) {
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandler.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandler.java
new file mode 100644
index 00000000000..ec7d6cf1b5a
--- /dev/null
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.mysql.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+
+/**
+ * Handle MySQL sequence ID before sending to downstream.
+ */
+public final class MySQLSequenceIDInboundHandler extends ChannelInboundHandlerAdapter {
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext context, final Object msg) {
+        ByteBuf byteBuf = (ByteBuf) msg;
+        short sequenceId = byteBuf.readUnsignedByte();
+        context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(sequenceId + 1);
+        context.fireChannelRead(byteBuf.readSlice(byteBuf.readableBytes()));
+    }
+}
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
index fa657cb9204..f8c88998269 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
@@ -112,13 +112,13 @@ class MySQLPacketCodecEngineTest {
     @Test
     void assertDecodePacketMoreThan16MB() {
         MySQLPacketCodecEngine engine = new MySQLPacketCodecEngine();
-        when(context.alloc().compositeBuffer(2)).thenReturn(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 2));
+        when(context.alloc().compositeBuffer(3)).thenReturn(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 3));
         List<Object> actual = new ArrayList<>(1);
         for (ByteBuf each : preparePacketMoreThan16MB()) {
             engine.decode(context, each, actual);
         }
         assertThat(actual.size(), is(1));
-        assertThat(((ByteBuf) actual.get(0)).readableBytes(), is((1 << 24) - 1));
+        assertThat(((ByteBuf) actual.get(0)).readableBytes(), is(1 << 24));
     }
     
     private List<ByteBuf> preparePacketMoreThan16MB() {
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandlerTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandlerTest.java
new file mode 100644
index 00000000000..17f422896f3
--- /dev/null
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.mysql.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.netty.ProxyFlowControlHandler;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class MySQLSequenceIDInboundHandlerTest {
+    
+    @Test
+    void assertChannelReadWithFlowControl() {
+        EmbeddedChannel channel = new EmbeddedChannel(new FixtureOutboundHandler(), new ProxyFlowControlHandler(), new MySQLSequenceIDInboundHandler(), new FixtureInboundHandler());
+        channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
+        channel.writeInbound(Unpooled.wrappedBuffer(new byte[1]), Unpooled.wrappedBuffer(new byte[1]), Unpooled.wrappedBuffer(new byte[1]));
+        assertThat(channel.<ByteBuf>readOutbound().readUnsignedByte(), is((short) 1));
+        assertThat(channel.<ByteBuf>readOutbound().readUnsignedByte(), is((short) 1));
+        assertThat(channel.<ByteBuf>readOutbound().readUnsignedByte(), is((short) 1));
+    }
+    
+    private static class FixtureOutboundHandler extends ChannelOutboundHandlerAdapter {
+        
+        @Override
+        public void write(final ChannelHandlerContext context, final Object msg, final ChannelPromise promise) {
+            byte sequenceId = (byte) context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().getAndIncrement();
+            context.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{sequenceId}));
+        }
+    }
+    
+    private static class FixtureInboundHandler extends ChannelInboundHandlerAdapter {
+        
+        @Override
+        public void channelRead(final ChannelHandlerContext context, final Object msg) {
+            context.channel().config().setAutoRead(false);
+            context.executor().execute(() -> {
+                context.writeAndFlush(Unpooled.EMPTY_BUFFER);
+                context.channel().pipeline().fireUserEventTriggered(WriteCompleteEvent.getInstance());
+            });
+        }
+    }
+}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index c5a0dac580f..44a5a0ea659 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -43,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLNe
 import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
 import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.mysql.netty.MySQLSequenceIDInboundHandler;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
@@ -101,6 +102,7 @@ public final class MySQLClient {
                         socketChannel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
                         socketChannel.pipeline().addLast(new ChannelAttrInitializer());
                         socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
+                        socketChannel.pipeline().addLast(new MySQLSequenceIDInboundHandler());
                         socketChannel.pipeline().addLast(new MySQLNegotiatePackageDecoder());
                         socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
                         socketChannel.pipeline().addLast(new MySQLNegotiateHandler(connectInfo.getUsername(), connectInfo.getPassword(), responseCallback));
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index c22f7c884f8..de85ab0fef5 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
 import org.apache.shardingsphere.db.protocol.packet.CommandPacket;
 import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
@@ -95,6 +96,7 @@ public final class CommandExecutorTask implements Runnable {
                 context.flush();
             }
             processClosedExceptions(exceptions);
+            context.pipeline().fireUserEventTriggered(WriteCompleteEvent.getInstance());
             if (sqlShowEnabled) {
                 clearLogMDC();
             }
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
index 6f9b275d477..e6a0e222c31 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
@@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
 import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
+import org.apache.shardingsphere.db.protocol.netty.ProxyFlowControlHandler;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
@@ -40,11 +41,12 @@ public final class ServerHandlerInitializer extends ChannelInitializer<Channel>
     @Override
     protected void initChannel(final Channel socketChannel) {
         DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = TypedSPILoader.getService(DatabaseProtocolFrontendEngine.class, databaseType.getType());
-        databaseProtocolFrontendEngine.initChannel(socketChannel);
         ChannelPipeline pipeline = socketChannel.pipeline();
         pipeline.addLast(new ChannelAttrInitializer());
         pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
         pipeline.addLast(new FrontendChannelLimitationInboundHandler(databaseProtocolFrontendEngine));
-        pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine, socketChannel));
+        pipeline.addLast(ProxyFlowControlHandler.class.getSimpleName(), new ProxyFlowControlHandler());
+        pipeline.addLast(FrontendChannelInboundHandler.class.getSimpleName(), new FrontendChannelInboundHandler(databaseProtocolFrontendEngine, socketChannel));
+        databaseProtocolFrontendEngine.initChannel(socketChannel);
     }
 }
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
index cecac1a52bb..91003559872 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
@@ -40,6 +40,7 @@ public final class OKProxyState implements ProxyState {
     public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final ConnectionSession connectionSession) {
         CommandExecutorTask commandExecutorTask = new CommandExecutorTask(databaseProtocolFrontendEngine, connectionSession, context, message);
         ExecutorService executorService = determineSuitableExecutorService(context, message, databaseProtocolFrontendEngine, connectionSession);
+        context.channel().config().setAutoRead(false);
         executorService.execute(commandExecutorTask);
     }
     
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java
index eccf20e940a..015aff474a7 100644
--- a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
 import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
+import org.apache.shardingsphere.db.protocol.netty.ProxyFlowControlHandler;
 import org.apache.shardingsphere.test.fixture.infra.database.type.MockedDatabaseType;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.ConstructionMockSettings;
@@ -28,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -45,6 +47,7 @@ class ServerHandlerInitializerTest {
         verify(pipeline).addLast(any(ChannelAttrInitializer.class));
         verify(pipeline).addLast(any(PacketCodec.class));
         verify(pipeline).addLast(any(FrontendChannelLimitationInboundHandler.class));
-        verify(pipeline).addLast(any(FrontendChannelInboundHandler.class));
+        verify(pipeline).addLast(eq(ProxyFlowControlHandler.class.getSimpleName()), any(ProxyFlowControlHandler.class));
+        verify(pipeline).addLast(eq(FrontendChannelInboundHandler.class.getSimpleName()), any(FrontendChannelInboundHandler.class));
     }
 }
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java
index 0454002c793..bdfa02fc25b 100644
--- a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.proxy.frontend.state.impl;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.util.concurrent.EventExecutor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.config.props.BackendExecutorType;
@@ -31,6 +32,8 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
 import org.apache.shardingsphere.transaction.api.TransactionType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.internal.configuration.plugins.Plugins;
@@ -48,6 +51,19 @@ import static org.mockito.Mockito.when;
 @StaticMockSettings(ProxyContext.class)
 class OKProxyStateTest {
     
+    private ChannelHandlerContext context;
+    
+    @BeforeEach
+    void setup() {
+        context = mock(ChannelHandlerContext.class);
+        when(context.channel()).thenReturn(new EmbeddedChannel());
+    }
+    
+    @AfterEach
+    void tearDown() {
+        context.channel().close().syncUninterruptibly();
+    }
+    
     @Test
     void assertExecuteWithProxyHintEnabled() {
         ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
@@ -56,7 +72,7 @@ class OKProxyStateTest {
         ConnectionSession connectionSession = mock(ConnectionSession.class, RETURNS_DEEP_STUBS);
         when(connectionSession.getConnectionId()).thenReturn(1);
         ExecutorService executorService = registerMockExecutorService(1);
-        new OKProxyState().execute(mock(ChannelHandlerContext.class), null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
+        new OKProxyState().execute(context, null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
         verify(executorService).execute(any(CommandExecutorTask.class));
         ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(1);
     }
@@ -70,7 +86,7 @@ class OKProxyStateTest {
         when(connectionSession.getTransactionStatus().getTransactionType()).thenReturn(TransactionType.XA);
         when(connectionSession.getConnectionId()).thenReturn(1);
         ExecutorService executorService = registerMockExecutorService(1);
-        new OKProxyState().execute(mock(ChannelHandlerContext.class), null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
+        new OKProxyState().execute(context, null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
         verify(executorService).execute(any(CommandExecutorTask.class));
         ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(1);
     }
@@ -83,7 +99,6 @@ class OKProxyStateTest {
                 ConfigurationPropertyKey.PROXY_BACKEND_EXECUTOR_SUITABLE)).thenReturn(BackendExecutorType.OLTP);
         when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
         EventExecutor eventExecutor = mock(EventExecutor.class);
-        ChannelHandlerContext context = mock(ChannelHandlerContext.class);
         when(context.executor()).thenReturn(eventExecutor);
         new OKProxyState().execute(context, null, mock(DatabaseProtocolFrontendEngine.class), mock(ConnectionSession.class, RETURNS_DEEP_STUBS));
         verify(eventExecutor).execute(any(CommandExecutorTask.class));
@@ -101,7 +116,7 @@ class OKProxyStateTest {
         DatabaseProtocolFrontendEngine frontendEngine = mock(DatabaseProtocolFrontendEngine.class, RETURNS_DEEP_STUBS);
         when(frontendEngine.getFrontendContext().isRequiredSameThreadForConnection(null)).thenReturn(true);
         ExecutorService executorService = registerMockExecutorService(1);
-        new OKProxyState().execute(mock(ChannelHandlerContext.class), null, frontendEngine, connectionSession);
+        new OKProxyState().execute(context, null, frontendEngine, connectionSession);
         verify(executorService).execute(any(CommandExecutorTask.class));
         ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(1);
     }
diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index ea9617241f5..4debe8e9ee7 100644
--- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -33,6 +33,8 @@ import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
 import org.apache.shardingsphere.proxy.frontend.mysql.authentication.MySQLAuthenticationEngine;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.MySQLCommandExecuteEngine;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
+import org.apache.shardingsphere.db.protocol.mysql.netty.MySQLSequenceIDInboundHandler;
+import org.apache.shardingsphere.proxy.frontend.netty.FrontendChannelInboundHandler;
 import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +61,7 @@ public final class MySQLFrontendEngine implements DatabaseProtocolFrontendEngine
     @Override
     public void initChannel(final Channel channel) {
         channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
+        channel.pipeline().addBefore(FrontendChannelInboundHandler.class.getSimpleName(), MySQLSequenceIDInboundHandler.class.getSimpleName(), new MySQLSequenceIDInboundHandler());
     }
     
     @Override