You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/04/10 10:41:01 UTC
[shardingsphere] branch master updated: Fix MySQL packet out of order when client sending pipelining requests (#25081)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 496b5dd965d Fix MySQL packet out of order when client sending pipelining requests (#25081)
496b5dd965d is described below
commit 496b5dd965dcf1ee8bf67172097554dc408082b0
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Mon Apr 10 18:40:52 2023 +0800
Fix MySQL packet out of order when client sending pipelining requests (#25081)
* Handle MySQL sequence ID after flow control
* Add ProxyFlowControlHandler
* Remove TODO in MySQLPacketCodecEngine
* Add MySQLSequenceIDInboundHandlerTest
* Revert MySQLAuthenticationEngine
* Complete ServerHandlerInitializerTest
* Add ProxyFlowControlHandlerTest
* Complete MySQLPacketCodecEngineTest
* Complete OKProxyStateTest
* Read slice in MySQLSequenceIDInboundHandler
---
db-protocol/core/pom.xml | 2 +-
.../db/protocol/event/WriteCompleteEvent.java | 35 +++++++++++
.../db/protocol/netty/ProxyFlowControlHandler.java | 36 +++++++++++
.../netty/ProxyFlowControlHandlerTest.java | 47 +++++++++++++++
.../mysql/codec/MySQLPacketCodecEngine.java | 9 ++-
.../mysql/netty/MySQLSequenceIDInboundHandler.java | 37 ++++++++++++
.../mysql/codec/MySQLPacketCodecEngineTest.java | 4 +-
.../netty/MySQLSequenceIDInboundHandlerTest.java | 69 ++++++++++++++++++++++
.../pipeline/mysql/ingest/client/MySQLClient.java | 2 +
.../frontend/command/CommandExecutorTask.java | 2 +
.../frontend/netty/ServerHandlerInitializer.java | 6 +-
.../proxy/frontend/state/impl/OKProxyState.java | 1 +
.../netty/ServerHandlerInitializerTest.java | 5 +-
.../frontend/state/impl/OKProxyStateTest.java | 23 ++++++--
.../proxy/frontend/mysql/MySQLFrontendEngine.java | 3 +
15 files changed, 266 insertions(+), 15 deletions(-)
diff --git a/db-protocol/core/pom.xml b/db-protocol/core/pom.xml
index 12b493c9260..32dfd426701 100644
--- a/db-protocol/core/pom.xml
+++ b/db-protocol/core/pom.xml
@@ -36,7 +36,7 @@
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
+ <artifactId>netty-handler</artifactId>
</dependency>
</dependencies>
</project>
diff --git a/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/event/WriteCompleteEvent.java b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/event/WriteCompleteEvent.java
new file mode 100644
index 00000000000..e8426d00d0e
--- /dev/null
+++ b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/event/WriteCompleteEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.event;
+
+/**
+ * Write complete event.
+ */
+public final class WriteCompleteEvent {
+
+ private static final WriteCompleteEvent INSTANCE = new WriteCompleteEvent();
+
+ /**
+ * Get instance of {@link WriteCompleteEvent}.
+ *
+ * @return instance of {@link WriteCompleteEvent}
+ */
+ public static WriteCompleteEvent getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandler.java b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandler.java
new file mode 100644
index 00000000000..c8c90fd31a2
--- /dev/null
+++ b/db-protocol/core/src/main/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.flow.FlowControlHandler;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
+
+/**
+ * Flow control handler for ShardingSphere-Proxy.
+ */
+public final class ProxyFlowControlHandler extends FlowControlHandler {
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+ if (WriteCompleteEvent.getInstance() == evt) {
+ ctx.channel().config().setAutoRead(true);
+ }
+ ctx.fireUserEventTriggered(evt);
+ }
+}
diff --git a/db-protocol/core/src/test/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandlerTest.java b/db-protocol/core/src/test/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandlerTest.java
new file mode 100644
index 00000000000..bb15481d1d4
--- /dev/null
+++ b/db-protocol/core/src/test/java/org/apache/shardingsphere/db/protocol/netty/ProxyFlowControlHandlerTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ProxyFlowControlHandlerTest {
+
+ @Test
+ void assertUserEventTriggered() {
+ AtomicBoolean eventReceived = new AtomicBoolean(false);
+ EmbeddedChannel channel = new EmbeddedChannel(new ProxyFlowControlHandler(), new ChannelInboundHandlerAdapter() {
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+ eventReceived.set(WriteCompleteEvent.getInstance() == evt);
+ }
+ });
+ channel.config().setAutoRead(false);
+ channel.pipeline().fireUserEventTriggered(WriteCompleteEvent.getInstance());
+ assertTrue(channel.config().isAutoRead());
+ assertTrue(eventReceived.get());
+ }
+}
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
index 0dc9b2797fe..18f802355aa 100644
--- a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngine.java
@@ -61,11 +61,9 @@ public final class MySQLPacketCodecEngine implements DatabasePacketCodecEngine<M
in.resetReaderIndex();
return;
}
- short sequenceId = in.readUnsignedByte();
- context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(sequenceId + 1);
- ByteBuf message = in.readRetainedSlice(payloadLength);
+ ByteBuf message = in.readRetainedSlice(remainPayloadLength);
if (MAX_PACKET_LENGTH == payloadLength) {
- pendingMessages.add(message);
+ pendingMessages.add(message.skipBytes(SEQUENCE_LENGTH));
} else if (pendingMessages.isEmpty()) {
out.add(message);
} else {
@@ -74,7 +72,8 @@ public final class MySQLPacketCodecEngine implements DatabasePacketCodecEngine<M
}
private void aggregateMessages(final ChannelHandlerContext context, final ByteBuf lastMessage, final List<Object> out) {
- CompositeByteBuf result = context.alloc().compositeBuffer(pendingMessages.size() + 1);
+ CompositeByteBuf result = context.alloc().compositeBuffer(SEQUENCE_LENGTH + pendingMessages.size() + 1);
+ result.addComponent(true, lastMessage.readSlice(SEQUENCE_LENGTH));
Iterator<ByteBuf> pendingMessagesIterator = pendingMessages.iterator();
result.addComponent(true, pendingMessagesIterator.next());
while (pendingMessagesIterator.hasNext()) {
diff --git a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandler.java b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandler.java
new file mode 100644
index 00000000000..ec7d6cf1b5a
--- /dev/null
+++ b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.mysql.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+
+/**
+ * Handle MySQL sequence ID before sending to downstream.
+ */
+public final class MySQLSequenceIDInboundHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(final ChannelHandlerContext context, final Object msg) {
+ ByteBuf byteBuf = (ByteBuf) msg;
+ short sequenceId = byteBuf.readUnsignedByte();
+ context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(sequenceId + 1);
+ context.fireChannelRead(byteBuf.readSlice(byteBuf.readableBytes()));
+ }
+}
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
index fa657cb9204..f8c88998269 100644
--- a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/codec/MySQLPacketCodecEngineTest.java
@@ -112,13 +112,13 @@ class MySQLPacketCodecEngineTest {
@Test
void assertDecodePacketMoreThan16MB() {
MySQLPacketCodecEngine engine = new MySQLPacketCodecEngine();
- when(context.alloc().compositeBuffer(2)).thenReturn(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 2));
+ when(context.alloc().compositeBuffer(3)).thenReturn(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 3));
List<Object> actual = new ArrayList<>(1);
for (ByteBuf each : preparePacketMoreThan16MB()) {
engine.decode(context, each, actual);
}
assertThat(actual.size(), is(1));
- assertThat(((ByteBuf) actual.get(0)).readableBytes(), is((1 << 24) - 1));
+ assertThat(((ByteBuf) actual.get(0)).readableBytes(), is(1 << 24));
}
private List<ByteBuf> preparePacketMoreThan16MB() {
diff --git a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandlerTest.java b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandlerTest.java
new file mode 100644
index 00000000000..17f422896f3
--- /dev/null
+++ b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/netty/MySQLSequenceIDInboundHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.mysql.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.netty.ProxyFlowControlHandler;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class MySQLSequenceIDInboundHandlerTest {
+
+ @Test
+ void assertChannelReadWithFlowControl() {
+ EmbeddedChannel channel = new EmbeddedChannel(new FixtureOutboundHandler(), new ProxyFlowControlHandler(), new MySQLSequenceIDInboundHandler(), new FixtureInboundHandler());
+ channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
+ channel.writeInbound(Unpooled.wrappedBuffer(new byte[1]), Unpooled.wrappedBuffer(new byte[1]), Unpooled.wrappedBuffer(new byte[1]));
+ assertThat(channel.<ByteBuf>readOutbound().readUnsignedByte(), is((short) 1));
+ assertThat(channel.<ByteBuf>readOutbound().readUnsignedByte(), is((short) 1));
+ assertThat(channel.<ByteBuf>readOutbound().readUnsignedByte(), is((short) 1));
+ }
+
+ private static class FixtureOutboundHandler extends ChannelOutboundHandlerAdapter {
+
+ @Override
+ public void write(final ChannelHandlerContext context, final Object msg, final ChannelPromise promise) {
+ byte sequenceId = (byte) context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().getAndIncrement();
+ context.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{sequenceId}));
+ }
+ }
+
+ private static class FixtureInboundHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(final ChannelHandlerContext context, final Object msg) {
+ context.channel().config().setAutoRead(false);
+ context.executor().execute(() -> {
+ context.writeAndFlush(Unpooled.EMPTY_BUFFER);
+ context.channel().pipeline().fireUserEventTriggered(WriteCompleteEvent.getInstance());
+ });
+ }
+ }
+}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index c5a0dac580f..44a5a0ea659 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -43,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLNe
import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.mysql.netty.MySQLSequenceIDInboundHandler;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
@@ -101,6 +102,7 @@ public final class MySQLClient {
socketChannel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
socketChannel.pipeline().addLast(new ChannelAttrInitializer());
socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
+ socketChannel.pipeline().addLast(new MySQLSequenceIDInboundHandler());
socketChannel.pipeline().addLast(new MySQLNegotiatePackageDecoder());
socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
socketChannel.pipeline().addLast(new MySQLNegotiateHandler(connectInfo.getUsername(), connectInfo.getPassword(), responseCallback));
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index c22f7c884f8..de85ab0fef5 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
+import org.apache.shardingsphere.db.protocol.event.WriteCompleteEvent;
import org.apache.shardingsphere.db.protocol.packet.CommandPacket;
import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
@@ -95,6 +96,7 @@ public final class CommandExecutorTask implements Runnable {
context.flush();
}
processClosedExceptions(exceptions);
+ context.pipeline().fireUserEventTriggered(WriteCompleteEvent.getInstance());
if (sqlShowEnabled) {
clearLogMDC();
}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
index 6f9b275d477..e6a0e222c31 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
@@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
+import org.apache.shardingsphere.db.protocol.netty.ProxyFlowControlHandler;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
@@ -40,11 +41,12 @@ public final class ServerHandlerInitializer extends ChannelInitializer<Channel>
@Override
protected void initChannel(final Channel socketChannel) {
DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = TypedSPILoader.getService(DatabaseProtocolFrontendEngine.class, databaseType.getType());
- databaseProtocolFrontendEngine.initChannel(socketChannel);
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelAttrInitializer());
pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
pipeline.addLast(new FrontendChannelLimitationInboundHandler(databaseProtocolFrontendEngine));
- pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine, socketChannel));
+ pipeline.addLast(ProxyFlowControlHandler.class.getSimpleName(), new ProxyFlowControlHandler());
+ pipeline.addLast(FrontendChannelInboundHandler.class.getSimpleName(), new FrontendChannelInboundHandler(databaseProtocolFrontendEngine, socketChannel));
+ databaseProtocolFrontendEngine.initChannel(socketChannel);
}
}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
index cecac1a52bb..91003559872 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java
@@ -40,6 +40,7 @@ public final class OKProxyState implements ProxyState {
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final ConnectionSession connectionSession) {
CommandExecutorTask commandExecutorTask = new CommandExecutorTask(databaseProtocolFrontendEngine, connectionSession, context, message);
ExecutorService executorService = determineSuitableExecutorService(context, message, databaseProtocolFrontendEngine, connectionSession);
+ context.channel().config().setAutoRead(false);
executorService.execute(commandExecutorTask);
}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java
index eccf20e940a..015aff474a7 100644
--- a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializerTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
+import org.apache.shardingsphere.db.protocol.netty.ProxyFlowControlHandler;
import org.apache.shardingsphere.test.fixture.infra.database.type.MockedDatabaseType;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.ConstructionMockSettings;
@@ -28,6 +29,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -45,6 +47,7 @@ class ServerHandlerInitializerTest {
verify(pipeline).addLast(any(ChannelAttrInitializer.class));
verify(pipeline).addLast(any(PacketCodec.class));
verify(pipeline).addLast(any(FrontendChannelLimitationInboundHandler.class));
- verify(pipeline).addLast(any(FrontendChannelInboundHandler.class));
+ verify(pipeline).addLast(eq(ProxyFlowControlHandler.class.getSimpleName()), any(ProxyFlowControlHandler.class));
+ verify(pipeline).addLast(eq(FrontendChannelInboundHandler.class.getSimpleName()), any(FrontendChannelInboundHandler.class));
}
}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java
index 0454002c793..bdfa02fc25b 100644
--- a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyStateTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.frontend.state.impl;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.concurrent.EventExecutor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.config.props.BackendExecutorType;
@@ -31,6 +32,8 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.apache.shardingsphere.transaction.api.TransactionType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.internal.configuration.plugins.Plugins;
@@ -48,6 +51,19 @@ import static org.mockito.Mockito.when;
@StaticMockSettings(ProxyContext.class)
class OKProxyStateTest {
+ private ChannelHandlerContext context;
+
+ @BeforeEach
+ void setup() {
+ context = mock(ChannelHandlerContext.class);
+ when(context.channel()).thenReturn(new EmbeddedChannel());
+ }
+
+ @AfterEach
+ void tearDown() {
+ context.channel().close().syncUninterruptibly();
+ }
+
@Test
void assertExecuteWithProxyHintEnabled() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
@@ -56,7 +72,7 @@ class OKProxyStateTest {
ConnectionSession connectionSession = mock(ConnectionSession.class, RETURNS_DEEP_STUBS);
when(connectionSession.getConnectionId()).thenReturn(1);
ExecutorService executorService = registerMockExecutorService(1);
- new OKProxyState().execute(mock(ChannelHandlerContext.class), null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
+ new OKProxyState().execute(context, null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
verify(executorService).execute(any(CommandExecutorTask.class));
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(1);
}
@@ -70,7 +86,7 @@ class OKProxyStateTest {
when(connectionSession.getTransactionStatus().getTransactionType()).thenReturn(TransactionType.XA);
when(connectionSession.getConnectionId()).thenReturn(1);
ExecutorService executorService = registerMockExecutorService(1);
- new OKProxyState().execute(mock(ChannelHandlerContext.class), null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
+ new OKProxyState().execute(context, null, mock(DatabaseProtocolFrontendEngine.class), connectionSession);
verify(executorService).execute(any(CommandExecutorTask.class));
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(1);
}
@@ -83,7 +99,6 @@ class OKProxyStateTest {
ConfigurationPropertyKey.PROXY_BACKEND_EXECUTOR_SUITABLE)).thenReturn(BackendExecutorType.OLTP);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
EventExecutor eventExecutor = mock(EventExecutor.class);
- ChannelHandlerContext context = mock(ChannelHandlerContext.class);
when(context.executor()).thenReturn(eventExecutor);
new OKProxyState().execute(context, null, mock(DatabaseProtocolFrontendEngine.class), mock(ConnectionSession.class, RETURNS_DEEP_STUBS));
verify(eventExecutor).execute(any(CommandExecutorTask.class));
@@ -101,7 +116,7 @@ class OKProxyStateTest {
DatabaseProtocolFrontendEngine frontendEngine = mock(DatabaseProtocolFrontendEngine.class, RETURNS_DEEP_STUBS);
when(frontendEngine.getFrontendContext().isRequiredSameThreadForConnection(null)).thenReturn(true);
ExecutorService executorService = registerMockExecutorService(1);
- new OKProxyState().execute(mock(ChannelHandlerContext.class), null, frontendEngine, connectionSession);
+ new OKProxyState().execute(context, null, frontendEngine, connectionSession);
verify(executorService).execute(any(CommandExecutorTask.class));
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(1);
}
diff --git a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index ea9617241f5..4debe8e9ee7 100644
--- a/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ b/proxy/frontend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -33,6 +33,8 @@ import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
import org.apache.shardingsphere.proxy.frontend.mysql.authentication.MySQLAuthenticationEngine;
import org.apache.shardingsphere.proxy.frontend.mysql.command.MySQLCommandExecuteEngine;
import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
+import org.apache.shardingsphere.db.protocol.mysql.netty.MySQLSequenceIDInboundHandler;
+import org.apache.shardingsphere.proxy.frontend.netty.FrontendChannelInboundHandler;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +61,7 @@ public final class MySQLFrontendEngine implements DatabaseProtocolFrontendEngine
@Override
public void initChannel(final Channel channel) {
channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
+ channel.pipeline().addBefore(FrontendChannelInboundHandler.class.getSimpleName(), MySQLSequenceIDInboundHandler.class.getSimpleName(), new MySQLSequenceIDInboundHandler());
}
@Override