You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/10 11:04:15 UTC
[shardingsphere] branch master updated: mysql client add reconnect
function where active is inactive (#6323)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9481645 mysql client add reconnect function where active is inactive (#6323)
9481645 is described below
commit 9481645cbed4411407586e7f63190ca8e98cd446
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Jul 10 19:03:58 2020 +0800
mysql client add reconnect function where active is inactive (#6323)
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/mysql/MySQLBinlogDumper.java | 3 +-
.../scaling/mysql/client/ConnectInfo.java | 36 ++++++++++
.../scaling/mysql/client/MySQLClient.java | 80 +++++++++++++---------
.../scaling/mysql/client/MySQLClientTest.java | 7 +-
4 files changed, 92 insertions(+), 34 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index f1fc7b8..b1bb0e4 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Placeholde
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
+import org.apache.shardingsphere.scaling.mysql.client.ConnectInfo;
import org.apache.shardingsphere.scaling.mysql.client.MySQLClient;
import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractRowsEvent;
@@ -83,7 +84,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
public void dump(final Channel channel) {
JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration();
final JdbcUri uri = new JdbcUri(jdbcDataSourceConfiguration.getJdbcUrl());
- MySQLClient client = new MySQLClient(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword());
+ MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword()));
client.connect();
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
while (isRunning()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java
new file mode 100644
index 0000000..6e689dd
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.mysql.client;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+@Getter
+public class ConnectInfo {
+
+ private final int serverId;
+
+ private final String host;
+
+ private final int port;
+
+ private final String username;
+
+ private final String password;
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
index d483e08..87e24b8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
@@ -17,18 +17,6 @@
package org.apache.shardingsphere.scaling.mysql.client;
-import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
-import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
-import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLNegotiateHandler;
-import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLBinlogEventPacketDecoder;
-import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLCommandPacketDecoder;
-import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -43,6 +31,17 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
+import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
+import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
+import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLBinlogEventPacketDecoder;
+import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLCommandPacketDecoder;
+import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLNegotiateHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
@@ -56,17 +55,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class MySQLClient {
- private final int serverId;
-
- private final String host;
-
- private final int port;
+ private final ConnectInfo connectInfo;
- private final String username;
-
- private final String password;
-
- private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
+ private EventLoopGroup eventLoopGroup;
private Channel channel;
@@ -80,21 +71,21 @@ public final class MySQLClient {
* Connect to MySQL.
*/
public synchronized void connect() {
+ eventLoopGroup = new NioEventLoopGroup(1);
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
channel = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
+ .option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
- socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback));
+ socketChannel.pipeline().addLast(new MySQLNegotiateHandler(connectInfo.getUsername(), connectInfo.getPassword(), responseCallback));
socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
}
- })
- .option(ChannelOption.AUTO_READ, true)
- .connect(host, port).channel();
+ }).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
serverInfo = waitExpectedResponse(ServerInfo.class);
}
@@ -158,7 +149,8 @@ public final class MySQLClient {
private void registerSlave() {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
- MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket(serverId, localAddress.getHostName(), username, password, localAddress.getPort());
+ MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket(
+ connectInfo.getServerId(), localAddress.getHostName(), connectInfo.getUsername(), connectInfo.getPassword(), localAddress.getPort());
channel.writeAndFlush(registerSlaveCommandPacket);
waitExpectedResponse(MySQLOKPacket.class);
}
@@ -185,9 +177,9 @@ public final class MySQLClient {
channel.pipeline().remove(MySQLCommandResponseHandler.class);
channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength));
channel.pipeline().addLast(new MySQLBinlogEventHandler());
- channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, serverId, binlogFileName));
+ channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
}
-
+
/**
* Poll binlog event.
*
@@ -220,7 +212,7 @@ public final class MySQLClient {
}
}
- class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
+ private class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
@@ -238,18 +230,42 @@ public final class MySQLClient {
}
}
- class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
+ private class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
+
+ private AbstractBinlogEvent lastBinlogEvent;
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof AbstractBinlogEvent) {
- blockingEventQueue.put((AbstractBinlogEvent) msg);
+ lastBinlogEvent = (AbstractBinlogEvent) msg;
+ blockingEventQueue.put(lastBinlogEvent);
}
}
@Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ log.warn("channel inactive");
+ reconnect();
+ }
+
+ @Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
log.error("protocol resolution error", cause);
+ reconnect();
+ }
+
+ private void reconnect() {
+ log.info("reconnect mysql client.");
+ closeOldChannel();
+ connect();
+ subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
+ }
+
+ private void closeOldChannel() {
+ try {
+ channel.closeFuture().sync();
+ } catch (InterruptedException ignored) {
+ }
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
index 1bb7b84..6aa0322 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.mysql.client;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Promise;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
@@ -56,7 +57,7 @@ public final class MySQLClientTest {
@Before
public void setUp() {
- mysqlClient = new MySQLClient(1, "host", 3306, "username", "password");
+ mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, "username", "password"));
when(channel.pipeline()).thenReturn(pipeline);
inetSocketAddress = new InetSocketAddress("host", 3306);
when(channel.localAddress()).thenReturn(inetSocketAddress);
@@ -75,6 +76,7 @@ public final class MySQLClientTest {
public void assertExecute() throws NoSuchFieldException, IllegalAccessException {
mockChannelResponse(new MySQLOKPacket(0));
ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+ ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
assertTrue(mysqlClient.execute(""));
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
}
@@ -85,6 +87,7 @@ public final class MySQLClientTest {
ReflectionUtil.setFieldValueToClass(expected, "affectedRows", 10);
mockChannelResponse(expected);
ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+ ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
assertThat(mysqlClient.executeUpdate(""), is(10));
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
}
@@ -94,6 +97,7 @@ public final class MySQLClientTest {
InternalResultSet expected = new InternalResultSet(null);
mockChannelResponse(expected);
ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+ ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
assertThat(mysqlClient.executeQuery(""), is(expected));
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
}
@@ -104,6 +108,7 @@ public final class MySQLClientTest {
serverInfo.setServerVersion(new ServerVersion("5.5.0-log"));
ReflectionUtil.setFieldValueToClass(mysqlClient, "serverInfo", serverInfo);
ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+ ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
mockChannelResponse(new MySQLOKPacket(0));
mysqlClient.subscribe("", 4L);
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComRegisterSlaveCommandPacket.class));