You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/08 11:46:53 UTC
[shardingsphere] branch master updated: Improve MySQL incremental client reconnect and close (#22740)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8bfa611ac9b Improve MySQL incremental client reconnect and close (#22740)
8bfa611ac9b is described below
commit 8bfa611ac9b524313ab11d1b95435fc6aee4cdc5
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Dec 8 19:46:40 2022 +0800
Improve MySQL incremental client reconnect and close (#22740)
* Improve MySQL incremental client reconnect and close
* Fix codestyle
* Improve
* Optimize code readability
---
.../pipeline/mysql/ingest/client/MySQLClient.java | 29 ++++++++++++++--------
.../mysql/ingest/client/PasswordEncryption.java | 4 +--
.../ingest/client/PasswordEncryptionTest.java | 2 +-
3 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 63f8e8ad98d..25bdb8b066a 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -29,11 +29,13 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
+import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.exception.job.BinlogSyncChannelAlreadyClosedException;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLCommandPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLNegotiateHandler;
@@ -104,6 +106,8 @@ public final class MySQLClient {
}
}).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
serverInfo = waitExpectedResponse(ServerInfo.class);
+ reconnectTimes.set(0);
+ running = true;
}
/**
@@ -156,7 +160,6 @@ public final class MySQLClient {
registerSlave();
dumpBinlog(binlogFileName, binlogPosition, queryChecksumLength());
log.info("subscribe binlog file: {}, position: {}", binlogFileName, binlogPosition);
- reconnectTimes.set(0);
}
private void initDumpConnectSession() {
@@ -197,10 +200,17 @@ public final class MySQLClient {
channel.pipeline().remove(MySQLCommandResponseHandler.class);
String tableKey = String.join(":", connectInfo.getHost(), String.valueOf(connectInfo.getPort()));
channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength, GlobalTableMapEventMapping.getTableMapEventMap(tableKey)));
- channel.pipeline().addLast(new MySQLBinlogEventHandler());
+ channel.pipeline().addLast(new MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
}
+ private AbstractBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
+ PlaceholderEvent result = new PlaceholderEvent();
+ result.setFileName(binlogFileName);
+ result.setPosition(binlogPosition);
+ return result;
+ }
+
/**
* Poll binlog event.
*
@@ -242,6 +252,7 @@ public final class MySQLClient {
return;
}
try {
+ running = false;
channel.close().sync();
} catch (final InterruptedException ex) {
log.error("close channel interrupted", ex);
@@ -266,9 +277,10 @@ public final class MySQLClient {
}
}
+ @AllArgsConstructor
private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
- private AbstractBinlogEvent lastBinlogEvent;
+ private volatile AbstractBinlogEvent lastBinlogEvent;
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
@@ -291,25 +303,22 @@ public final class MySQLClient {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
- running = false;
String fileName = null == lastBinlogEvent ? null : lastBinlogEvent.getFileName();
Long position = null == lastBinlogEvent ? null : lastBinlogEvent.getPosition();
log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
+ if (!running) {
+ return;
+ }
reconnect();
}
private void reconnect() {
+ closeChannel();
if (reconnectTimes.get() > 3) {
log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
- running = false;
return;
}
reconnectTimes.incrementAndGet();
- if (null == lastBinlogEvent || null == lastBinlogEvent.getFileName()) {
- log.warn("last binlog event is null or the file name is null, last binlog event:{}", lastBinlogEvent);
- return;
- }
- closeChannel();
connect();
subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java
index 1d1427d77c3..29eb06c86ea 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryption.java
@@ -106,9 +106,7 @@ public final class PasswordEncryption {
}
private static byte[] formatKey(final String key) {
- int start = key.indexOf("\n") + 1;
- int end = key.lastIndexOf("\n");
- return key.substring(start, end).replace("\n", "").getBytes();
+ return key.replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "").trim().replace("\n", "").getBytes();
}
private static byte[] concatSeed(final MessageDigest messageDigest, final byte[] seed, final byte[] passwordSha1) {
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java
index 2d3ce046393..50daea952f9 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/PasswordEncryptionTest.java
@@ -72,6 +72,6 @@ public final class PasswordEncryptionTest {
+ "XaUZwZHdXjEme0/D8p8KBXdMipanZXwHdL+LOBSACj3/FwHn+6oZO2k02g80uofs\n"
+ "zFdWMjpPVqVCqe85GRFzEY73wDYEItl0d+9a9OV3FFZqVgC2FLk3cD5qajPtyo6v\n"
+ "UQIDAQAB\n"
- + "-----END PUBLIC KEY-----";
+ + "-----END PUBLIC KEY-----\n";
}
}