You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/08 11:46:53 UTC

[shardingsphere] branch master updated: Improve MySQL incremental client reconnect and close (#22740)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bfa611ac9b Improve MySQL incremental client reconnect and close (#22740)
8bfa611ac9b is described below

commit 8bfa611ac9b524313ab11d1b95435fc6aee4cdc5
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Dec 8 19:46:40 2022 +0800

    Improve MySQL incremental client reconnect and close (#22740)
    
    * Improve MySQL incremental client reconnect and close
    
    * Fix codestyle
    
    * Improve
    
    * Optimize code readability
---
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 29 ++++++++++++++--------
 .../mysql/ingest/client/PasswordEncryption.java    |  4 +--
 .../ingest/client/PasswordEncryptionTest.java      |  2 +-
 3 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 63f8e8ad98d..25bdb8b066a 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -29,11 +29,13 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
+import lombok.AllArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.BinlogSyncChannelAlreadyClosedException;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLCommandPacketDecoder;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLNegotiateHandler;
@@ -104,6 +106,8 @@ public final class MySQLClient {
                     }
                 }).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
         serverInfo = waitExpectedResponse(ServerInfo.class);
+        reconnectTimes.set(0);
+        running = true;
     }
     
     /**
@@ -156,7 +160,6 @@ public final class MySQLClient {
         registerSlave();
         dumpBinlog(binlogFileName, binlogPosition, queryChecksumLength());
         log.info("subscribe binlog file: {}, position: {}", binlogFileName, binlogPosition);
-        reconnectTimes.set(0);
     }
     
     private void initDumpConnectSession() {
@@ -197,10 +200,17 @@ public final class MySQLClient {
         channel.pipeline().remove(MySQLCommandResponseHandler.class);
         String tableKey = String.join(":", connectInfo.getHost(), String.valueOf(connectInfo.getPort()));
         channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength, GlobalTableMapEventMapping.getTableMapEventMap(tableKey)));
-        channel.pipeline().addLast(new MySQLBinlogEventHandler());
+        channel.pipeline().addLast(new MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
         channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
     }
     
+    private AbstractBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
+        PlaceholderEvent result = new PlaceholderEvent();
+        result.setFileName(binlogFileName);
+        result.setPosition(binlogPosition);
+        return result;
+    }
+    
     /**
      * Poll binlog event.
      *
@@ -242,6 +252,7 @@ public final class MySQLClient {
             return;
         }
         try {
+            running = false;
             channel.close().sync();
         } catch (final InterruptedException ex) {
             log.error("close channel interrupted", ex);
@@ -266,9 +277,10 @@ public final class MySQLClient {
         }
     }
     
+    @AllArgsConstructor
     private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
         
-        private AbstractBinlogEvent lastBinlogEvent;
+        private volatile AbstractBinlogEvent lastBinlogEvent;
         
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
@@ -291,25 +303,22 @@ public final class MySQLClient {
         
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
-            running = false;
             String fileName = null == lastBinlogEvent ? null : lastBinlogEvent.getFileName();
             Long position = null == lastBinlogEvent ? null : lastBinlogEvent.getPosition();
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
+            if (!running) {
+                return;
+            }
             reconnect();
         }
         
         private void reconnect() {
+            closeChannel();
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
-                running = false;
                 return;
             }
             reconnectTimes.incrementAndGet();
-            if (null == lastBinlogEvent || null == lastBinlogEvent.getFileName()) {
-                log.warn("last binlog event is null or the file name is null, last binlog event:{}", lastBinlogEvent);
-                return;
-            }
-            closeChannel();
             connect();
             subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
         }
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java
index 1d1427d77c3..29eb06c86ea 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java
@@ -106,9 +106,7 @@ public final class PasswordEncryption {
     }
     
     private static byte[] formatKey(final String key) {
-        int start = key.indexOf("\n") + 1;
-        int end = key.lastIndexOf("\n");
-        return key.substring(start, end).replace("\n", "").getBytes();
+        return key.replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "").trim().replace("\n", "").getBytes();
     }
     
     private static byte[] concatSeed(final MessageDigest messageDigest, final byte[] seed, final byte[] passwordSha1) {
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java
index 2d3ce046393..50daea952f9 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java
@@ -72,6 +72,6 @@ public final class PasswordEncryptionTest {
                 + "XaUZwZHdXjEme0/D8p8KBXdMipanZXwHdL+LOBSACj3/FwHn+6oZO2k02g80uofs\n"
                 + "zFdWMjpPVqVCqe85GRFzEY73wDYEItl0d+9a9OV3FFZqVgC2FLk3cD5qajPtyo6v\n"
                 + "UQIDAQAB\n"
-                + "-----END PUBLIC KEY-----";
+                + "-----END PUBLIC KEY-----\n";
     }
 }