You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/12/30 07:38:26 UTC

[shardingsphere] branch master updated: Init MySQL sequence ID when channel initializing (#23192)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f22250a04 Init MySQL sequence ID when channel initializing (#23192)
d8f22250a04 is described below

commit d8f22250a04617eb3c9b19d9bf2fd134a35eac30
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Fri Dec 30 15:38:20 2022 +0800

    Init MySQL sequence ID when channel initializing (#23192)
---
 .../frontend/executor/ConnectionThreadExecutorGroup.java      |  3 +++
 .../proxy/frontend/netty/ServerHandlerInitializer.java        |  1 +
 .../proxy/frontend/mysql/MySQLFrontendEngine.java             | 11 ++++++++++-
 .../mysql/authentication/MySQLAuthenticationEngine.java       |  6 ++----
 .../proxy/frontend/mysql/MySQLFrontendEngineTest.java         |  8 ++++++++
 .../proxy/frontend/spi/DatabaseProtocolFrontendEngine.java    |  9 +++++++++
 6 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
index 4731bc7355d..d89d7bdd7c5 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/ConnectionThreadExecutorGroup.java
@@ -81,6 +81,9 @@ public final class ConnectionThreadExecutorGroup {
      */
     public void unregisterAndAwaitTermination(final int connectionId) {
         ExecutorService executorService = executorServices.remove(connectionId);
+        if (null == executorService) {
+            return;
+        }
         executorService.shutdown();
         try {
             executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
index c2fc6963999..358bed65e39 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/ServerHandlerInitializer.java
@@ -40,6 +40,7 @@ public final class ServerHandlerInitializer extends ChannelInitializer<SocketCha
     @Override
     protected void initChannel(final SocketChannel socketChannel) {
         DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = TypedSPIRegistry.getRegisteredService(DatabaseProtocolFrontendEngine.class, databaseType.getType(), new Properties());
+        databaseProtocolFrontendEngine.initChannel(socketChannel);
         ChannelPipeline pipeline = socketChannel.pipeline();
         pipeline.addLast(new ChannelAttrInitializer());
         pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index 57fe1b0d083..ea9617241f5 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -17,12 +17,13 @@
 
 package org.apache.shardingsphere.proxy.frontend.mysql;
 
+import io.netty.channel.Channel;
 import lombok.Getter;
 import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -31,8 +32,11 @@ import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
 import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
 import org.apache.shardingsphere.proxy.frontend.mysql.authentication.MySQLAuthenticationEngine;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.MySQLCommandExecuteEngine;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Frontend engine for MySQL.
  */
@@ -52,6 +56,11 @@ public final class MySQLFrontendEngine implements DatabaseProtocolFrontendEngine
                 .getContextManager().getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.PROXY_MYSQL_DEFAULT_VERSION));
     }
     
+    @Override
+    public void initChannel(final Channel channel) {
+        channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
+    }
+    
     @Override
     public void setDatabaseVersion(final String databaseName, final String databaseVersion) {
         MySQLServerInfo.setServerVersion(databaseName, databaseVersion);
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index 4d5b4a4c241..3db3cb9886f 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -24,9 +24,7 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCapabilityFlag;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
-import org.apache.shardingsphere.dialect.mysql.vendor.MySQLVendorError;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLStatusFlag;
-import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -35,17 +33,18 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandsha
 import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandshakeResponse41Packet;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
+import org.apache.shardingsphere.dialect.mysql.vendor.MySQLVendorError;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
 import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResult;
 import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResultBuilder;
 import org.apache.shardingsphere.proxy.frontend.connection.ConnectionIdGenerator;
 import org.apache.shardingsphere.proxy.frontend.mysql.authentication.authenticator.MySQLAuthenticator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Authentication engine for MySQL.
@@ -64,7 +63,6 @@ public final class MySQLAuthenticationEngine implements AuthenticationEngine {
     
     @Override
     public int handshake(final ChannelHandlerContext context) {
-        context.channel().attr(MySQLConstants.MYSQL_SEQUENCE_ID).set(new AtomicInteger());
         int result = ConnectionIdGenerator.getInstance().nextId();
         connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
         context.writeAndFlush(new MySQLHandshakePacket(result, authenticationHandler.getAuthPluginData()));
diff --git a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
index b3344b05f56..635c6d3ae78 100644
--- a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
+++ b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngineTest.java
@@ -60,12 +60,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -106,6 +108,12 @@ public final class MySQLFrontendEngineTest extends ProxyContextRestorer {
         mysqlFrontendEngine = new MySQLFrontendEngine();
     }
     
+    @Test
+    public void assertInitChannel() {
+        mysqlFrontendEngine.initChannel(channel);
+        verify(channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID)).set(any(AtomicInteger.class));
+    }
+    
     @Test
     public void assertHandshake() {
         assertTrue(mysqlFrontendEngine.getAuthenticationEngine().handshake(context) > 0);
diff --git a/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java b/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
index 6fab06399a2..2c79559bd0c 100644
--- a/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
+++ b/proxy/frontend/spi/src/main/java/org/apache/shardingsphere/proxy/frontend/spi/DatabaseProtocolFrontendEngine.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.proxy.frontend.spi;
 
+import io.netty.channel.Channel;
 import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationEngine;
@@ -29,6 +30,14 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
  */
 public interface DatabaseProtocolFrontendEngine extends TypedSPI {
     
+    /**
+     * Initialize channel.
+     * 
+     * @param channel channel
+     */
+    default void initChannel(Channel channel) {
+    }
+    
     /**
      * Set database version.
      *