You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/09 00:34:24 UTC

[shardingsphere] branch master updated: Improve MySQL client thread safety (#20890)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d573da5225 Improve MySQL client thread safety (#20890)
3d573da5225 is described below

commit 3d573da5225c652e7f42f039aa12a61f1d74a1b7
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Sep 9 08:34:15 2022 +0800

    Improve MySQL client thread safety (#20890)
---
 .../data/pipeline/mysql/ingest/binlog/BinlogContext.java    |  2 +-
 .../data/pipeline/mysql/ingest/client/ServerInfo.java       |  2 +-
 .../data/pipeline/mysql/ingest/client/ServerVersion.java    | 13 ++++++++++---
 .../ingest/client/netty/MySQLCommandPacketDecoder.java      |  4 ++--
 .../mysql/ingest/client/netty/MySQLNegotiateHandler.java    |  6 +++---
 .../ingest/client/netty/MySQLNegotiatePackageDecoder.java   |  2 +-
 6 files changed, 18 insertions(+), 11 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
index 9c64b573293..771f4662615 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
@@ -34,7 +34,7 @@ import java.util.Map;
 public final class BinlogContext {
     
     @Setter
-    private String fileName;
+    private volatile String fileName;
     
     private final int checksumLength;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
index b31f142e649..07fcdf17605 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
@@ -27,5 +27,5 @@ import lombok.Setter;
 @Setter
 public final class ServerInfo {
     
-    private ServerVersion serverVersion;
+    private volatile ServerVersion serverVersion;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
index 0ca5b8528ce..16285950169 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -26,15 +27,16 @@ import java.util.regex.Pattern;
  * Server Version.
  */
 @Getter
+@Slf4j
 public final class ServerVersion {
     
     private static final Pattern VERSION_PATTERN = Pattern.compile("^(\\d+)\\.(\\d+)\\.(\\d+).*");
     
-    private int major;
+    private final int major;
     
-    private int minor;
+    private final int minor;
     
-    private int series;
+    private final int series;
     
     public ServerVersion(final String version) {
         Matcher matcher = VERSION_PATTERN.matcher(version);
@@ -42,6 +44,11 @@ public final class ServerVersion {
             major = Short.parseShort(matcher.group(1));
             minor = Short.parseShort(matcher.group(2));
             series = Short.parseShort(matcher.group(3));
+        } else {
+            log.info("Could not match MySQL server version {}", version);
+            major = 0;
+            minor = 0;
+            series = 0;
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
index fcd32b1b39e..d9ca263dde1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
@@ -41,9 +41,9 @@ public final class MySQLCommandPacketDecoder extends ByteToMessageDecoder {
         ResponsePacket, FieldPacket, RowDataPacket
     }
     
-    private States currentState = States.ResponsePacket;
+    private volatile States currentState = States.ResponsePacket;
     
-    private InternalResultSet internalResultSet;
+    private volatile InternalResultSet internalResultSet;
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
index b4990095864..b73961a9391 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
@@ -57,11 +57,11 @@ public final class MySQLNegotiateHandler extends ChannelInboundHandlerAdapter {
     
     private final Promise<Object> authResultCallback;
     
-    private ServerInfo serverInfo;
+    private volatile ServerInfo serverInfo;
     
-    private byte[] seed;
+    private volatile byte[] seed;
     
-    private boolean publicKeyRequested;
+    private volatile boolean publicKeyRequested;
     
     @SneakyThrows
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiatePackageDecoder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiatePackageDecoder.java
index c108d8629eb..05c6f2de15a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiatePackageDecoder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiatePackageDecoder.java
@@ -37,7 +37,7 @@ import java.util.List;
  */
 public final class MySQLNegotiatePackageDecoder extends ByteToMessageDecoder {
     
-    private boolean handshakeReceived;
+    private volatile boolean handshakeReceived;
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {