You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/03/15 02:59:29 UTC
[incubator-seatunnel] branch dev updated: [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ac61409bd [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
ac61409bd is described below
commit ac61409bd845cd90ff04961ed25ace2820f92aea
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Mar 15 10:59:24 2023 +0800
[bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
---------
Co-authored-by: TaoZex <45...@users.noreply.github.com>
---
.../cdc/base/source/reader/external/FetchTask.java | 5 ++++
.../external/IncrementalSourceScanFetcher.java | 11 +++++++--
.../external/IncrementalSourceStreamFetcher.java | 11 +++++++--
.../seatunnel/cdc/mysql/source/MySqlDialect.java | 11 +--------
.../reader/fetch/MySqlSourceFetchTaskContext.java | 27 +++++++++++++++++-----
.../reader/fetch/binlog/MySqlBinlogFetchTask.java | 5 ++++
.../reader/fetch/scan/MySqlSnapshotFetchTask.java | 5 ++++
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 1 +
.../sqlserver/source/source/SqlServerDialect.java | 8 +------
.../fetch/SqlServerSourceFetchTaskContext.java | 25 +++++++++++++++-----
.../fetch/scan/SqlServerSnapshotFetchTask.java | 5 ++++
.../SqlServerTransactionLogFetchTask.java | 5 ++++
12 files changed, 86 insertions(+), 33 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
index 7a8ba5fe3..1cc61bd77 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
@@ -41,6 +41,9 @@ public interface FetchTask<Split> {
/** Returns current task is running or not. */
boolean isRunning();
+ /** Close this task */
+ void shutdown();
+
/** Returns the split that the task used. */
Split getSplit();
@@ -63,5 +66,7 @@ public interface FetchTask<Split> {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);
List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);
+
+ void close();
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 25bf35429..56f59ba08 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -192,13 +192,20 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
@Override
public void close() {
try {
+ if (taskContext != null) {
+ taskContext.close();
+ }
+ if (snapshotSplitReadTask != null) {
+ snapshotSplitReadTask.shutdown();
+ }
if (executorService != null) {
executorService.shutdown();
- if (executorService.awaitTermination(
+ if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
- "Failed to close the scan fetcher in {} seconds.",
+ "Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
+ executorService.shutdownNow();
}
}
} catch (Exception e) {
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index f14eac745..f3715e710 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -121,13 +121,20 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
@Override
public void close() {
try {
+ if (taskContext != null) {
+ taskContext.close();
+ }
+ if (streamFetchTask != null) {
+ streamFetchTask.shutdown();
+ }
if (executorService != null) {
executorService.shutdown();
- if (executorService.awaitTermination(
+ if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
- "Failed to close the stream fetcher in {} seconds.",
+ "Failed to close the stream fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
+ executorService.shutdownNow();
}
}
} catch (Exception e) {
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 1352a78e0..15d3b6bf7 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -33,8 +33,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.s
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@@ -42,8 +40,6 @@ import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
import java.util.List;
-import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
-import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -104,12 +100,7 @@ public class MySqlDialect implements JdbcDataSourceDialect {
@Override
public MySqlSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
- final MySqlConnection jdbcConnection =
- createMySqlConnection(taskSourceConfig.getDbzConfiguration());
- final BinaryLogClient binaryLogClient =
- createBinaryClient(taskSourceConfig.getDbzConfiguration());
- return new MySqlSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, binaryLogClient);
+ return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index b32f62f6b..c6aebc8aa 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -62,15 +62,21 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
+import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
+@Slf4j
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
@@ -89,13 +95,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private MySqlErrorHandler errorHandler;
public MySqlSourceFetchTaskContext(
- JdbcSourceConfig sourceConfig,
- JdbcDataSourceDialect dataSourceDialect,
- MySqlConnection connection,
- BinaryLogClient binaryLogClient) {
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
- this.connection = connection;
- this.binaryLogClient = binaryLogClient;
+ this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration());
+ this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration());
this.metadataProvider = new MySqlEventMetadataProvider();
}
@@ -159,6 +162,18 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
}
+ @Override
+ public void close() {
+ try {
+ this.connection.close();
+ this.binaryLogClient.disconnect();
+ } catch (SQLException e) {
+ log.warn("Failed to close connection", e);
+ } catch (IOException e) {
+ log.warn("Failed to close binaryLogClient", e);
+ }
+ }
+
@Override
public MySqlSourceConfig getSourceConfig() {
return (MySqlSourceConfig) sourceConfig;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index d59070a2f..f858864e7 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -81,6 +81,11 @@ public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
index 6c0bea13e..ac997f7ed 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -155,6 +155,11 @@ public class MySqlSnapshotFetchTask implements FetchTask<SourceSplitBase> {
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 1f2fb0973..b9ffe6034 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -339,6 +339,7 @@ public class MySqlUtils {
private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
throws SQLException {
final Connection connection = jdbc.connection();
+ // Add MySQL metadata locks to prevent modification of table structure.
connection.setAutoCommit(false);
final PreparedStatement statement =
connection.prepareStatement(
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index cdef63119..0494cd98e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -34,7 +34,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.rea
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;
-import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@@ -104,13 +103,8 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
@Override
public SqlServerSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
- final SqlServerConnection jdbcConnection =
- createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
- final SqlServerConnection metaDataConnection =
- createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
- return new SqlServerSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, metaDataConnection);
+ return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 6dd8e7d40..200806f39 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -58,13 +58,18 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
+import lombok.extern.slf4j.Slf4j;
+import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
+
/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
+@Slf4j
public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private final SqlServerConnection dataConnection;
@@ -83,13 +88,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
public SqlServerSourceFetchTaskContext(
- JdbcSourceConfig sourceConfig,
- JdbcDataSourceDialect dataSourceDialect,
- SqlServerConnection dataConnection,
- SqlServerConnection metadataConnection) {
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
- this.dataConnection = dataConnection;
- this.metadataConnection = metadataConnection;
+
+ this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
+ this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}
@@ -158,6 +161,16 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
}
+ @Override
+ public void close() {
+ try {
+ this.dataConnection.close();
+ this.metadataConnection.close();
+ } catch (SQLException e) {
+ log.warn("Failed to close connection", e);
+ }
+ }
+
@Override
public SqlServerSourceConfig getSourceConfig() {
return (SqlServerSourceConfig) sourceConfig;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
index a8246fa24..82d43971e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
@@ -166,6 +166,11 @@ public class SqlServerSnapshotFetchTask implements FetchTask<SourceSplitBase> {
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
index 21a6af3a5..f6968b621 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
@@ -78,6 +78,11 @@ public class SqlServerTransactionLogFetchTask implements FetchTask<SourceSplitBa
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;