You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/22 12:50:44 UTC
[shardingsphere] branch master updated: Improve scaling MySQLClient reconnect and add max retry times and support interrupt. (#18516)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2ef68fa17ed Improve scaling MySQLClient reconnect and add max retry times and support interrupt. (#18516)
2ef68fa17ed is described below
commit 2ef68fa17eda6fc7fc03968129e6130403fea7e7
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Wed Jun 22 20:50:37 2022 +0800
Improve scaling MySQLClient reconnect and add max retry times and support interrupt. (#18516)
* Improve scaling increment task reconnect and add max retry times
* Fix codestyles
* Fix test unit
---
.../mysql/ingest/MySQLIncrementalDumper.java | 36 +++++++++------
.../pipeline/mysql/ingest/client/MySQLClient.java | 51 +++++++++++++++++-----
.../mysql/ingest/MySQLIncrementalDumperTest.java | 30 ++++++++++---
.../mysql/ingest/client/MySQLClientTest.java | 21 +++++++++
4 files changed, 108 insertions(+), 30 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 1adf68bc201..f979edde3d7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -72,6 +72,10 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
private final PipelineChannel channel;
+ private final MySQLClient client;
+
+ private final String catalog;
+
public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
super(dumperConfig, binlogPosition, channel, metaDataLoader);
@@ -80,6 +84,11 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.channel = channel;
this.metaDataLoader = metaDataLoader;
+ YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
+ log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
+ DataSourceMetaData metaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
+ client = new MySQLClient(new ConnectInfo(random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
+ catalog = metaData.getCatalog();
}
@Override
@@ -88,17 +97,13 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
}
private void dump() {
- YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
- log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
- DataSourceMetaData metaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), null);
- MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
client.connect();
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
int eventCount = 0;
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
if (null != event) {
- handleEvent(metaData.getCatalog(), event);
+ handleEvent(catalog, event);
eventCount++;
}
}
@@ -112,11 +117,14 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
return;
}
if (event instanceof WriteRowsEvent) {
- handleWriteRowsEvent((WriteRowsEvent) event);
+ PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
+ handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
} else if (event instanceof UpdateRowsEvent) {
- handleUpdateRowsEvent((UpdateRowsEvent) event);
+ PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
+ handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
} else if (event instanceof DeleteRowsEvent) {
- handleDeleteRowsEvent((DeleteRowsEvent) event);
+ PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
+ handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
}
}
@@ -124,8 +132,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
return !event.getDatabaseName().equals(database) || !dumperConfig.containsTable(event.getTableName());
}
- private void handleWriteRowsEvent(final WriteRowsEvent event) {
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData(event.getTableName());
+ private void handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
for (Serializable[] each : event.getAfterRows()) {
DataRecord record = createDataRecord(event, each.length);
record.setType(IngestDataChangeType.INSERT);
@@ -141,8 +148,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
}
- private void handleUpdateRowsEvent(final UpdateRowsEvent event) {
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData(event.getTableName());
+ private void handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Serializable[] afterValues = event.getAfterRows().get(i);
@@ -161,8 +167,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
}
}
- private void handleDeleteRowsEvent(final DeleteRowsEvent event) {
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData(event.getTableName());
+ private void handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
for (Serializable[] each : event.getBeforeRows()) {
DataRecord record = createDataRecord(event, each.length);
record.setType(IngestDataChangeType.DELETE);
@@ -198,5 +203,8 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
@Override
protected void doStop() {
+ if (null != client) {
+ client.closeChannel();
+ }
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index e9b7a9e63e3..d10e5022555 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -31,6 +31,7 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLCommandPacketDecoder;
@@ -70,6 +71,10 @@ public final class MySQLClient {
private ServerInfo serverInfo;
+ private volatile boolean running = true;
+
+ private volatile int reconnectTimes;
+
/**
* Connect to MySQL.
*/
@@ -80,6 +85,7 @@ public final class MySQLClient {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.AUTO_READ, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
@@ -193,6 +199,9 @@ public final class MySQLClient {
* @return binlog event
*/
public synchronized AbstractBinlogEvent poll() {
+ if (!running) {
+ throw new PipelineJobExecutionException("binlog sync channel already closed, can't poll event");
+ }
try {
return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ignored) {
@@ -219,6 +228,23 @@ public final class MySQLClient {
}
}
+
+ /**
+ * Close netty channel.
+ */
+ public void closeChannel() {
+ if (null == channel || !channel.isOpen()) {
+ return;
+ }
+ try {
+ channel.close();
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("close channel error", ex);
+ }
+ }
+
private final class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
@Override
@@ -243,6 +269,9 @@ public final class MySQLClient {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+ if (!running) {
+ return;
+ }
if (msg instanceof AbstractBinlogEvent) {
lastBinlogEvent = (AbstractBinlogEvent) msg;
blockingEventQueue.put(lastBinlogEvent);
@@ -252,27 +281,29 @@ public final class MySQLClient {
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
log.warn("channel inactive");
+ if (!running) {
+ return;
+ }
+ if (reconnectTimes > 3) {
+ log.warn("exceeds the maximum number of retry times, lastBinlogEvent={}", lastBinlogEvent);
+ running = false;
+ return;
+ }
reconnect();
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ running = false;
log.error("protocol resolution error", cause);
- reconnect();
}
private void reconnect() {
- log.info("reconnect mysql client.");
- closeOldChannel();
+ reconnectTimes++;
+ log.info("reconnect mysql client, retryTimes={}", reconnectTimes);
+ closeChannel();
connect();
subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
}
-
- private void closeOldChannel() {
- try {
- channel.closeFuture().sync();
- } catch (final InterruptedException ignored) {
- }
- }
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 0706d8281f0..8ecb162cc34 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -30,6 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -40,12 +42,17 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRo
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -53,7 +60,10 @@ import java.util.List;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public final class MySQLIncrementalDumperTest {
private MySQLIncrementalDumper incrementalDumper;
@@ -62,13 +72,19 @@ public final class MySQLIncrementalDumperTest {
private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+ @Mock
+ private PipelineTableMetaData pipelineTableMetaData;
+
@Before
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
+ dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
channel = new MultiplexMemoryPipelineChannel();
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
+ PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true);
+ when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(column);
}
private DumperConfiguration mockDumperConfiguration() {
@@ -97,14 +113,15 @@ public final class MySQLIncrementalDumperTest {
}
@Test
- public void assertWriteRowsEvent() {
+ public void assertWriteRowsEvent() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setAfterRows(rows);
- invokeHandleEvent(rowsEvent);
+ ReflectionUtil.invokeMethod(incrementalDumper, "handleWriteRowsEvent", new Class[]{WriteRowsEvent.class, PipelineTableMetaData.class},
+ new Object[]{rowsEvent, pipelineTableMetaData});
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertThat(records.get(0), instanceOf(DataRecord.class));
@@ -112,7 +129,7 @@ public final class MySQLIncrementalDumperTest {
}
@Test
- public void assertUpdateRowsEvent() {
+ public void assertUpdateRowsEvent() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
@@ -122,7 +139,8 @@ public final class MySQLIncrementalDumperTest {
afterRows.add(new String[]{"1", "order_new"});
rowsEvent.setBeforeRows(beforeRows);
rowsEvent.setAfterRows(afterRows);
- invokeHandleEvent(rowsEvent);
+ ReflectionUtil.invokeMethod(incrementalDumper, "handleUpdateRowsEvent", new Class[]{UpdateRowsEvent.class, PipelineTableMetaData.class},
+ new Object[]{rowsEvent, pipelineTableMetaData});
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertThat(records.get(0), instanceOf(DataRecord.class));
@@ -130,14 +148,14 @@ public final class MySQLIncrementalDumperTest {
}
@Test
- public void assertDeleteRowsEvent() {
+ public void assertDeleteRowsEvent() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setBeforeRows(rows);
- invokeHandleEvent(rowsEvent);
+ ReflectionUtil.invokeMethod(incrementalDumper, "handleDeleteRowsEvent", new Class[]{DeleteRowsEvent.class, PipelineTableMetaData.class}, new Object[]{rowsEvent, pipelineTableMetaData});
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertThat(records.get(0), instanceOf(DataRecord.class));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index b0826f54910..1bed2530ff9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Promise;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
@@ -36,6 +37,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.net.InetSocketAddress;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
@@ -56,6 +58,11 @@ public final class MySQLClientTest {
public void setUp() {
mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, "username", "password"));
when(channel.pipeline()).thenReturn(pipeline);
+ when(channel.isOpen()).thenReturn(true);
+ when(channel.close()).thenAnswer(invocation -> {
+ when(channel.isOpen()).thenReturn(false);
+ return null;
+ });
when(channel.localAddress()).thenReturn(new InetSocketAddress("host", 3306));
}
@@ -130,4 +137,18 @@ public final class MySQLClientTest {
}
}).start();
}
+
+ @Test
+ public void assertCloseChannel() throws NoSuchFieldException, IllegalAccessException {
+ ReflectionUtil.setFieldValue(mysqlClient, "channel", channel);
+ mysqlClient.closeChannel();
+ assertFalse(channel.isOpen());
+ }
+
+ @Test(expected = PipelineJobExecutionException.class)
+ public void assertPollFailed() throws NoSuchFieldException, IllegalAccessException {
+ ReflectionUtil.setFieldValue(mysqlClient, "channel", channel);
+ ReflectionUtil.setFieldValue(mysqlClient, "running", false);
+ mysqlClient.poll();
+ }
}