You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/22 12:50:44 UTC

[shardingsphere] branch master updated: Improve scaling MySQLClient reconnect and add max retry times and support interrupt. (#18516)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ef68fa17ed Improve scaling MySQLClient reconnect and add max retry times and support interrupt.  (#18516)
2ef68fa17ed is described below

commit 2ef68fa17eda6fc7fc03968129e6130403fea7e7
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Wed Jun 22 20:50:37 2022 +0800

    Improve scaling MySQLClient reconnect and add max retry times and support interrupt.  (#18516)
    
    * Improve scaling increment task reconnect and add max retry times
    
    * Fix codestyles
    
    * Fix test unit
---
 .../mysql/ingest/MySQLIncrementalDumper.java       | 36 +++++++++------
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 51 +++++++++++++++++-----
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 30 ++++++++++---
 .../mysql/ingest/client/MySQLClientTest.java       | 21 +++++++++
 4 files changed, 108 insertions(+), 30 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 1adf68bc201..f979edde3d7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -72,6 +72,10 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     
     private final PipelineChannel channel;
     
+    private final MySQLClient client;
+    
+    private final String catalog;
+    
     public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
                                   final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
         super(dumperConfig, binlogPosition, channel, metaDataLoader);
@@ -80,6 +84,11 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
         this.channel = channel;
         this.metaDataLoader = metaDataLoader;
+        YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
+        log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
+        DataSourceMetaData metaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
+        client = new MySQLClient(new ConnectInfo(random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
+        catalog = metaData.getCatalog();
     }
     
     @Override
@@ -88,17 +97,13 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     }
     
     private void dump() {
-        YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
-        log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
-        DataSourceMetaData metaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
-        MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         int eventCount = 0;
         while (isRunning()) {
             AbstractBinlogEvent event = client.poll();
             if (null != event) {
-                handleEvent(metaData.getCatalog(), event);
+                handleEvent(catalog, event);
                 eventCount++;
             }
         }
@@ -112,11 +117,14 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
             return;
         }
         if (event instanceof WriteRowsEvent) {
-            handleWriteRowsEvent((WriteRowsEvent) event);
+            PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
+            handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
         } else if (event instanceof UpdateRowsEvent) {
-            handleUpdateRowsEvent((UpdateRowsEvent) event);
+            PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
+            handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
         } else if (event instanceof DeleteRowsEvent) {
-            handleDeleteRowsEvent((DeleteRowsEvent) event);
+            PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
+            handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
         }
     }
     
@@ -124,8 +132,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
         return !event.getDatabaseName().equals(database) || !dumperConfig.containsTable(event.getTableName());
     }
     
-    private void handleWriteRowsEvent(final WriteRowsEvent event) {
-        PipelineTableMetaData tableMetaData = getPipelineTableMetaData(event.getTableName());
+    private void handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
         for (Serializable[] each : event.getAfterRows()) {
             DataRecord record = createDataRecord(event, each.length);
             record.setType(IngestDataChangeType.INSERT);
@@ -141,8 +148,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
         return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
     }
     
-    private void handleUpdateRowsEvent(final UpdateRowsEvent event) {
-        PipelineTableMetaData tableMetaData = getPipelineTableMetaData(event.getTableName());
+    private void handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
             Serializable[] afterValues = event.getAfterRows().get(i);
@@ -161,8 +167,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
         }
     }
     
-    private void handleDeleteRowsEvent(final DeleteRowsEvent event) {
-        PipelineTableMetaData tableMetaData = getPipelineTableMetaData(event.getTableName());
+    private void handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
         for (Serializable[] each : event.getBeforeRows()) {
             DataRecord record = createDataRecord(event, each.length);
             record.setType(IngestDataChangeType.DELETE);
@@ -198,5 +203,8 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     
     @Override
     protected void doStop() {
+        if (null != client) {
+            client.closeChannel();
+        }
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index e9b7a9e63e3..d10e5022555 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -31,6 +31,7 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLCommandPacketDecoder;
@@ -70,6 +71,10 @@ public final class MySQLClient {
     
     private ServerInfo serverInfo;
     
+    private volatile boolean running = true;
+    
+    private volatile int reconnectTimes;
+    
     /**
      * Connect to MySQL.
      */
@@ -80,6 +85,7 @@ public final class MySQLClient {
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.AUTO_READ, true)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     
                     @Override
@@ -193,6 +199,9 @@ public final class MySQLClient {
      * @return binlog event
      */
     public synchronized AbstractBinlogEvent poll() {
+        if (!running) {
+            throw new PipelineJobExecutionException("binlog sync channel already closed, can't poll event");
+        }
         try {
             return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
         } catch (final InterruptedException ignored) {
@@ -219,6 +228,23 @@ public final class MySQLClient {
         }
     }
     
+    
+    /**
+     * Close netty channel.
+     */
+    public void closeChannel() {
+        if (null == channel || !channel.isOpen()) {
+            return;
+        }
+        try {
+            channel.close();
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            log.error("close channel error", ex);
+        }
+    }
+    
     private final class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
         
         @Override
@@ -243,6 +269,9 @@ public final class MySQLClient {
         
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+            if (!running) {
+                return;
+            }
             if (msg instanceof AbstractBinlogEvent) {
                 lastBinlogEvent = (AbstractBinlogEvent) msg;
                 blockingEventQueue.put(lastBinlogEvent);
@@ -252,27 +281,29 @@ public final class MySQLClient {
         @Override
         public void channelInactive(final ChannelHandlerContext ctx) {
             log.warn("channel inactive");
+            if (!running) {
+                return;
+            }
+            if (reconnectTimes > 3) {
+                log.warn("exceeds the maximum number of retry times, lastBinlogEvent={}", lastBinlogEvent);
+                running = false;
+                return;
+            }
             reconnect();
         }
         
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+            running = false;
             log.error("protocol resolution error", cause);
-            reconnect();
         }
         
         private void reconnect() {
-            log.info("reconnect mysql client.");
-            closeOldChannel();
+            reconnectTimes++;
+            log.info("reconnect mysql client, retryTimes={}", reconnectTimes);
+            closeChannel();
             connect();
             subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
         }
-        
-        private void closeOldChannel() {
-            try {
-                channel.closeFuture().sync();
-            } catch (final InterruptedException ignored) {
-            }
-        }
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 0706d8281f0..8ecb162cc34 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -30,6 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -40,12 +42,17 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRo
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -53,7 +60,10 @@ import java.util.List;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public final class MySQLIncrementalDumperTest {
     
     private MySQLIncrementalDumper incrementalDumper;
@@ -62,13 +72,19 @@ public final class MySQLIncrementalDumperTest {
     
     private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
     
+    @Mock
+    private PipelineTableMetaData pipelineTableMetaData;
+    
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
+        dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
         channel = new MultiplexMemoryPipelineChannel();
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
+        PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true);
+        when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(column);
     }
     
     private DumperConfiguration mockDumperConfiguration() {
@@ -97,14 +113,15 @@ public final class MySQLIncrementalDumperTest {
     }
     
     @Test
-    public void assertWriteRowsEvent() {
+    public void assertWriteRowsEvent() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
         WriteRowsEvent rowsEvent = new WriteRowsEvent();
         rowsEvent.setDatabaseName("");
         rowsEvent.setTableName("t_order");
         List<Serializable[]> rows = new ArrayList<>(1);
         rows.add(new String[]{"1", "order"});
         rowsEvent.setAfterRows(rows);
-        invokeHandleEvent(rowsEvent);
+        ReflectionUtil.invokeMethod(incrementalDumper, "handleWriteRowsEvent", new Class[]{WriteRowsEvent.class, PipelineTableMetaData.class},
+                new Object[]{rowsEvent, pipelineTableMetaData});
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertThat(records.get(0), instanceOf(DataRecord.class));
@@ -112,7 +129,7 @@ public final class MySQLIncrementalDumperTest {
     }
     
     @Test
-    public void assertUpdateRowsEvent() {
+    public void assertUpdateRowsEvent() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
         UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
         rowsEvent.setDatabaseName("");
         rowsEvent.setTableName("t_order");
@@ -122,7 +139,8 @@ public final class MySQLIncrementalDumperTest {
         afterRows.add(new String[]{"1", "order_new"});
         rowsEvent.setBeforeRows(beforeRows);
         rowsEvent.setAfterRows(afterRows);
-        invokeHandleEvent(rowsEvent);
+        ReflectionUtil.invokeMethod(incrementalDumper, "handleUpdateRowsEvent", new Class[]{UpdateRowsEvent.class, PipelineTableMetaData.class},
+                new Object[]{rowsEvent, pipelineTableMetaData});
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertThat(records.get(0), instanceOf(DataRecord.class));
@@ -130,14 +148,14 @@ public final class MySQLIncrementalDumperTest {
     }
     
     @Test
-    public void assertDeleteRowsEvent() {
+    public void assertDeleteRowsEvent() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
         DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
         rowsEvent.setDatabaseName("");
         rowsEvent.setTableName("t_order");
         List<Serializable[]> rows = new ArrayList<>(1);
         rows.add(new String[]{"1", "order"});
         rowsEvent.setBeforeRows(rows);
-        invokeHandleEvent(rowsEvent);
+        ReflectionUtil.invokeMethod(incrementalDumper, "handleDeleteRowsEvent", new Class[]{DeleteRowsEvent.class, PipelineTableMetaData.class}, new Object[]{rowsEvent, pipelineTableMetaData});
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertThat(records.get(0), instanceOf(DataRecord.class));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index b0826f54910..1bed2530ff9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.Promise;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
@@ -36,6 +37,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.net.InetSocketAddress;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.verify;
@@ -56,6 +58,11 @@ public final class MySQLClientTest {
     public void setUp() {
         mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, "username", "password"));
         when(channel.pipeline()).thenReturn(pipeline);
+        when(channel.isOpen()).thenReturn(true);
+        when(channel.close()).thenAnswer(invocation -> {
+            when(channel.isOpen()).thenReturn(false);
+            return null;
+        });
         when(channel.localAddress()).thenReturn(new InetSocketAddress("host", 3306));
     }
     
@@ -130,4 +137,18 @@ public final class MySQLClientTest {
             }
         }).start();
     }
+    
+    @Test
+    public void assertCloseChannel() throws NoSuchFieldException, IllegalAccessException {
+        ReflectionUtil.setFieldValue(mysqlClient, "channel", channel);
+        mysqlClient.closeChannel();
+        assertFalse(channel.isOpen());
+    }
+    
+    @Test(expected = PipelineJobExecutionException.class)
+    public void assertPollFailed() throws NoSuchFieldException, IllegalAccessException {
+        ReflectionUtil.setFieldValue(mysqlClient, "channel", channel);
+        ReflectionUtil.setFieldValue(mysqlClient, "running", false);
+        mysqlClient.poll();
+    }
 }