You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/07/10 11:04:15 UTC

[shardingsphere] branch master updated: mysql client add reconnect function where active is inactive (#6323)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9481645  mysql client add reconnect function where active is inactive (#6323)
9481645 is described below

commit 9481645cbed4411407586e7f63190ca8e98cd446
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Jul 10 19:03:58 2020 +0800

    mysql client add reconnect function where active is inactive (#6323)
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/mysql/MySQLBinlogDumper.java           |  3 +-
 .../scaling/mysql/client/ConnectInfo.java          | 36 ++++++++++
 .../scaling/mysql/client/MySQLClient.java          | 80 +++++++++++++---------
 .../scaling/mysql/client/MySQLClientTest.java      |  7 +-
 4 files changed, 92 insertions(+), 34 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index f1fc7b8..b1bb0e4 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Placeholde
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
 import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
+import org.apache.shardingsphere.scaling.mysql.client.ConnectInfo;
 import org.apache.shardingsphere.scaling.mysql.client.MySQLClient;
 import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
 import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractRowsEvent;
@@ -83,7 +84,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
     public void dump(final Channel channel) {
         JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration();
         final JdbcUri uri = new JdbcUri(jdbcDataSourceConfiguration.getJdbcUrl());
-        MySQLClient client = new MySQLClient(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword());
+        MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword()));
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         while (isRunning()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java
new file mode 100644
index 0000000..6e689dd
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.mysql.client;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+@Getter
+public class ConnectInfo {
+
+    private final int serverId;
+
+    private final String host;
+
+    private final int port;
+
+    private final String username;
+
+    private final String password;
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
index d483e08..87e24b8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java
@@ -17,18 +17,6 @@
 
 package org.apache.shardingsphere.scaling.mysql.client;
 
-import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
-import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
-import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLNegotiateHandler;
-import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLBinlogEventPacketDecoder;
-import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLCommandPacketDecoder;
-import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -43,6 +31,17 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.db.protocol.codec.PacketCodec;
+import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
+import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
+import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLBinlogEventPacketDecoder;
+import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLCommandPacketDecoder;
+import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLNegotiateHandler;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -56,17 +55,9 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public final class MySQLClient {
     
-    private final int serverId;
-    
-    private final String host;
-    
-    private final int port;
+    private final ConnectInfo connectInfo;
     
-    private final String username;
-    
-    private final String password;
-    
-    private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
+    private EventLoopGroup eventLoopGroup;
     
     private Channel channel;
     
@@ -80,21 +71,21 @@ public final class MySQLClient {
      * Connect to MySQL.
      */
     public synchronized void connect() {
+        eventLoopGroup = new NioEventLoopGroup(1);
         responseCallback = new DefaultPromise<>(eventLoopGroup.next());
         channel = new Bootstrap()
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
+                .option(ChannelOption.AUTO_READ, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(final SocketChannel socketChannel) {
                         socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
                         socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
-                        socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback));
+                        socketChannel.pipeline().addLast(new MySQLNegotiateHandler(connectInfo.getUsername(), connectInfo.getPassword(), responseCallback));
                         socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
                     }
-                })
-                .option(ChannelOption.AUTO_READ, true)
-                .connect(host, port).channel();
+                }).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
         serverInfo = waitExpectedResponse(ServerInfo.class);
     }
     
@@ -158,7 +149,8 @@ public final class MySQLClient {
     private void registerSlave() {
         responseCallback = new DefaultPromise<>(eventLoopGroup.next());
         InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
-        MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket(serverId, localAddress.getHostName(), username, password, localAddress.getPort());
+        MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket(
+                connectInfo.getServerId(), localAddress.getHostName(), connectInfo.getUsername(), connectInfo.getPassword(), localAddress.getPort());
         channel.writeAndFlush(registerSlaveCommandPacket);
         waitExpectedResponse(MySQLOKPacket.class);
     }
@@ -185,9 +177,9 @@ public final class MySQLClient {
         channel.pipeline().remove(MySQLCommandResponseHandler.class);
         channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength));
         channel.pipeline().addLast(new MySQLBinlogEventHandler());
-        channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, serverId, binlogFileName));
+        channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
     }
-
+    
     /**
      * Poll binlog event.
      *
@@ -220,7 +212,7 @@ public final class MySQLClient {
         }
     }
     
-    class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
+    private class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
         
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
@@ -238,18 +230,42 @@ public final class MySQLClient {
         }
     }
     
-    class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
+    private class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
+        
+        private AbstractBinlogEvent lastBinlogEvent;
         
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
             if (msg instanceof AbstractBinlogEvent) {
-                blockingEventQueue.put((AbstractBinlogEvent) msg);
+                lastBinlogEvent = (AbstractBinlogEvent) msg;
+                blockingEventQueue.put(lastBinlogEvent);
             }
         }
         
         @Override
+        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+            log.warn("channel inactive");
+            reconnect();
+        }
+        
+        @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
             log.error("protocol resolution error", cause);
+            reconnect();
+        }
+        
+        private void reconnect() {
+            log.info("reconnect mysql client.");
+            closeOldChannel();
+            connect();
+            subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
+        }
+        
+        private void closeOldChannel() {
+            try {
+                channel.closeFuture().sync();
+            } catch (InterruptedException ignored) {
+            }
         }
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
index 1bb7b84..6aa0322 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.mysql.client;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.Promise;
 
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
@@ -56,7 +57,7 @@ public final class MySQLClientTest {
     
     @Before
     public void setUp() {
-        mysqlClient = new MySQLClient(1, "host", 3306, "username", "password");
+        mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, "username", "password"));
         when(channel.pipeline()).thenReturn(pipeline);
         inetSocketAddress = new InetSocketAddress("host", 3306);
         when(channel.localAddress()).thenReturn(inetSocketAddress);
@@ -75,6 +76,7 @@ public final class MySQLClientTest {
     public void assertExecute() throws NoSuchFieldException, IllegalAccessException {
         mockChannelResponse(new MySQLOKPacket(0));
         ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+        ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
         assertTrue(mysqlClient.execute(""));
         verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
     }
@@ -85,6 +87,7 @@ public final class MySQLClientTest {
         ReflectionUtil.setFieldValueToClass(expected, "affectedRows", 10);
         mockChannelResponse(expected);
         ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+        ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
         assertThat(mysqlClient.executeUpdate(""), is(10));
         verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
     }
@@ -94,6 +97,7 @@ public final class MySQLClientTest {
         InternalResultSet expected = new InternalResultSet(null);
         mockChannelResponse(expected);
         ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+        ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
         assertThat(mysqlClient.executeQuery(""), is(expected));
         verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
     }
@@ -104,6 +108,7 @@ public final class MySQLClientTest {
         serverInfo.setServerVersion(new ServerVersion("5.5.0-log"));
         ReflectionUtil.setFieldValueToClass(mysqlClient, "serverInfo", serverInfo);
         ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel);
+        ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1));
         mockChannelResponse(new MySQLOKPacket(0));
         mysqlClient.subscribe("", 4L);
         verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComRegisterSlaveCommandPacket.class));