You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/12/30 07:38:26 UTC
[shardingsphere] branch master updated: Init MySQL sequence ID when channel initializing (#23192)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d8f22250a04 Init MySQL sequence ID when channel initializing (#23192)
d8f22250a04 is described below
commit d8f22250a04617eb3c9b19d9bf2fd134a35eac30
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Fri Dec 30 15:38:20 2022 +0800
Init MySQL sequence ID when channel initializing (#23192)
---
.../frontend/executor/ConnectionThreadExecutorGroup.java | 3 +++
.../proxy/frontend/netty/ServerHandlerInitializer.java | 1 +
.../proxy/frontend/mysql/MySQLFrontendEngine.java | 11 ++++++++++-
.../mysql/authentication/MySQLAuthenticationEngine.java | 6 ++----
.../proxy/frontend/mysql/MySQLFrontendEngineTest.java | 8 ++++++++
.../proxy/frontend/spi/DatabaseProtocolFrontendEngine.java | 9 +++++++++
6 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
index 4731bc7355d..d89d7bdd7c5 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
@@ -81,6 +81,9 @@ public final class ConnectionThreadExecutorGroup {
*/
public void unregisterAndAwaitTermination(final int connectionId) {
ExecutorService executorService = executorServices.remove(connectionId);
+ if (null == executorService) {
+ return;
+ }
executorService.shutdown();
try {
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
index c2fc6963999..358bed65e39 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
@@ -40,6 +40,7 @@ public final class ServerHandlerInitializer extends ChannelInitializer<SocketCha
@Override
protected void initChannel(final SocketChannel socketChannel) {
DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = TypedSPIRegistry.getRegisteredService(DatabaseProtocolFrontendEngine.class, databaseType.getType(), new Properties());
+ databaseProtocolFrontendEngine.initChannel(socketChannel);
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelAttrInitializer());
pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index 57fe1b0d083..ea9617241f5 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -17,12 +17,13 @@
package org.apache.shardingsphere.proxy.frontend.mysql;
+import io.netty.channel.Channel;
import lombok.Getter;
import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -31,8 +32,11 @@ import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
import org.apache.shardingsphere.proxy.frontend.mysql.authentication.MySQLAuthenticationEngine;
import org.apache.shardingsphere.proxy.frontend.mysql.command.MySQLCommandExecuteEngine;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Frontend engine for MySQL.
*/
@@ -52,6 +56,11 @@ public final class MySQLFrontendEngine implements DatabaseProtocolFrontendEngine
.getContextManager().getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.PROXY_MYSQL_DEFAULT_VERSION));
}
+ @Override
+ public void initChannel(final Channel channel) {
+ channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
+ }
+
@Override
public void setDatabaseVersion(final String databaseName, final String databaseVersion) {
MySQLServerInfo.setServerVersion(databaseName, databaseVersion);
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index 4d5b4a4c241..3db3cb9886f 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -24,9 +24,7 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCapabilityFlag;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
-import org.apache.shardingsphere.dialect.mysql.vendor.MySQLVendorError;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLStatusFlag;
-import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -35,17 +33,18 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandsha
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandshakeResponse41Packet;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
+import org.apache.shardingsphere.dialect.mysql.vendor.MySQLVendorError;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResult;
import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResultBuilder;
import org.apache.shardingsphere.proxy.frontend.connection.ConnectionIdGenerator;
import org.apache.shardingsphere.proxy.frontend.mysql.authentication.authenticator.MySQLAuthenticator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Authentication engine for MySQL.
@@ -64,7 +63,6 @@ public final class MySQLAuthenticationEngine implements AuthenticationEngine {
@Override
public int handshake(final ChannelHandlerContext context) {
- context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
int result = ConnectionIdGenerator.getInstance().nextId();
connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
context.writeAndFlush(new MySQLHandshakePacket(result, authenticationHandler.getAuthPluginData()));
diff --git a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
index b3344b05f56..635c6d3ae78 100644
--- a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
+++ b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
@@ -60,12 +60,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -106,6 +108,12 @@ public final class MySQLFrontendEngineTest extends ProxyContextRestorer {
mysqlFrontendEngine = new MySQLFrontendEngine();
}
+ @Test
+ public void assertInitChannel() {
+ mysqlFrontendEngine.initChannel(channel);
+ verify(channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID)).set(any(AtomicInteger.class));
+ }
+
@Test
public void assertHandshake() {
assertTrue(mysqlFrontendEngine.getAuthenticationEngine().handshake(context) > 0);
diff --git a/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java b/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
index 6fab06399a2..2c79559bd0c 100644
--- a/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
+++ b/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.proxy.frontend.spi;
+import io.netty.channel.Channel;
import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
@@ -29,6 +30,14 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
*/
public interface DatabaseProtocolFrontendEngine extends TypedSPI {
+ /**
+ * Initialize channel.
+ *
+ * @param channel channel
+ */
+ default void initChannel(Channel channel) {
+ }
+
/**
* Set database version.
*