You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/03/15 02:59:29 UTC

[incubator-seatunnel] branch dev updated: [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ac61409bd [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
ac61409bd is described below

commit ac61409bd845cd90ff04961ed25ace2820f92aea
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Wed Mar 15 10:59:24 2023 +0800

    [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
    
    
    
    ---------
    
    Co-authored-by: TaoZex <45...@users.noreply.github.com>
---
 .../cdc/base/source/reader/external/FetchTask.java |  5 ++++
 .../external/IncrementalSourceScanFetcher.java     | 11 +++++++--
 .../external/IncrementalSourceStreamFetcher.java   | 11 +++++++--
 .../seatunnel/cdc/mysql/source/MySqlDialect.java   | 11 +--------
 .../reader/fetch/MySqlSourceFetchTaskContext.java  | 27 +++++++++++++++++-----
 .../reader/fetch/binlog/MySqlBinlogFetchTask.java  |  5 ++++
 .../reader/fetch/scan/MySqlSnapshotFetchTask.java  |  5 ++++
 .../seatunnel/cdc/mysql/utils/MySqlUtils.java      |  1 +
 .../sqlserver/source/source/SqlServerDialect.java  |  8 +------
 .../fetch/SqlServerSourceFetchTaskContext.java     | 25 +++++++++++++++-----
 .../fetch/scan/SqlServerSnapshotFetchTask.java     |  5 ++++
 .../SqlServerTransactionLogFetchTask.java          |  5 ++++
 12 files changed, 86 insertions(+), 33 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
index 7a8ba5fe3..1cc61bd77 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
@@ -41,6 +41,9 @@ public interface FetchTask<Split> {
     /** Returns current task is running or not. */
     boolean isRunning();
 
+    /** Close this task */
+    void shutdown();
+
     /** Returns the split that the task used. */
     Split getSplit();
 
@@ -63,5 +66,7 @@ public interface FetchTask<Split> {
         void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);
 
         List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);
+
+        void close();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 25bf35429..56f59ba08 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -192,13 +192,20 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
     @Override
     public void close() {
         try {
+            if (taskContext != null) {
+                taskContext.close();
+            }
+            if (snapshotSplitReadTask != null) {
+                snapshotSplitReadTask.shutdown();
+            }
             if (executorService != null) {
                 executorService.shutdown();
-                if (executorService.awaitTermination(
+                if (!executorService.awaitTermination(
                         READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                     log.warn(
-                            "Failed to close the scan fetcher in {} seconds.",
+                            "Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
                             READER_CLOSE_TIMEOUT_SECONDS);
+                    executorService.shutdownNow();
                 }
             }
         } catch (Exception e) {
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index f14eac745..f3715e710 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -121,13 +121,20 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
     @Override
     public void close() {
         try {
+            if (taskContext != null) {
+                taskContext.close();
+            }
+            if (streamFetchTask != null) {
+                streamFetchTask.shutdown();
+            }
             if (executorService != null) {
                 executorService.shutdown();
-                if (executorService.awaitTermination(
+                if (!executorService.awaitTermination(
                         READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                     log.warn(
-                            "Failed to close the stream fetcher in {} seconds.",
+                            "Failed to close the stream fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
                             READER_CLOSE_TIMEOUT_SECONDS);
+                    executorService.shutdownNow();
                 }
             }
         } catch (Exception e) {
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 1352a78e0..15d3b6bf7 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -33,8 +33,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.s
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
 
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import io.debezium.connector.mysql.MySqlConnection;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
@@ -42,8 +40,6 @@ import io.debezium.relational.history.TableChanges;
 import java.sql.SQLException;
 import java.util.List;
 
-import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
-import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
 import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
 
 /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -104,12 +100,7 @@ public class MySqlDialect implements JdbcDataSourceDialect {
     @Override
     public MySqlSourceFetchTaskContext createFetchTaskContext(
             SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
-        final MySqlConnection jdbcConnection =
-                createMySqlConnection(taskSourceConfig.getDbzConfiguration());
-        final BinaryLogClient binaryLogClient =
-                createBinaryClient(taskSourceConfig.getDbzConfiguration());
-        return new MySqlSourceFetchTaskContext(
-                taskSourceConfig, this, jdbcConnection, binaryLogClient);
+        return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index b32f62f6b..c6aebc8aa 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -62,15 +62,21 @@ import io.debezium.relational.history.TableChanges;
 import io.debezium.schema.DataCollectionId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.Collect;
+import lombok.extern.slf4j.Slf4j;
 
+import java.io.IOException;
+import java.sql.SQLException;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
 
 /** The context for fetch task that fetching data of snapshot split from MySQL data source. */
+@Slf4j
 public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
@@ -89,13 +95,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
     private MySqlErrorHandler errorHandler;
 
     public MySqlSourceFetchTaskContext(
-            JdbcSourceConfig sourceConfig,
-            JdbcDataSourceDialect dataSourceDialect,
-            MySqlConnection connection,
-            BinaryLogClient binaryLogClient) {
+            JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
         super(sourceConfig, dataSourceDialect);
-        this.connection = connection;
-        this.binaryLogClient = binaryLogClient;
+        this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration());
+        this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration());
         this.metadataProvider = new MySqlEventMetadataProvider();
     }
 
@@ -159,6 +162,18 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
         this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
     }
 
+    @Override
+    public void close() {
+        try {
+            this.connection.close();
+            this.binaryLogClient.disconnect();
+        } catch (SQLException e) {
+            log.warn("Failed to close connection", e);
+        } catch (IOException e) {
+            log.warn("Failed to close binaryLogClient", e);
+        }
+    }
+
     @Override
     public MySqlSourceConfig getSourceConfig() {
         return (MySqlSourceConfig) sourceConfig;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index d59070a2f..f858864e7 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -81,6 +81,11 @@ public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
         return taskRunning;
     }
 
+    @Override
+    public void shutdown() {
+        taskRunning = false;
+    }
+
     @Override
     public SourceSplitBase getSplit() {
         return split;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
index 6c0bea13e..ac997f7ed 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -155,6 +155,11 @@ public class MySqlSnapshotFetchTask implements FetchTask<SourceSplitBase> {
         return taskRunning;
     }
 
+    @Override
+    public void shutdown() {
+        taskRunning = false;
+    }
+
     @Override
     public SourceSplitBase getSplit() {
         return split;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 1f2fb0973..b9ffe6034 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -339,6 +339,7 @@ public class MySqlUtils {
     private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
             throws SQLException {
         final Connection connection = jdbc.connection();
+        // Add MySQL metadata locks to prevent modification of table structure.
         connection.setAutoCommit(false);
         final PreparedStatement statement =
                 connection.prepareStatement(
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index cdef63119..0494cd98e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -34,7 +34,6 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.rea
 import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
 import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;
 
-import io.debezium.connector.sqlserver.SqlServerConnection;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
@@ -104,13 +103,8 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
     @Override
     public SqlServerSourceFetchTaskContext createFetchTaskContext(
             SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
-        final SqlServerConnection jdbcConnection =
-                createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
-        final SqlServerConnection metaDataConnection =
-                createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
 
-        return new SqlServerSourceFetchTaskContext(
-                taskSourceConfig, this, jdbcConnection, metaDataConnection);
+        return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 6dd8e7d40..200806f39 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -58,13 +58,18 @@ import io.debezium.relational.history.TableChanges;
 import io.debezium.schema.DataCollectionId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.Collect;
+import lombok.extern.slf4j.Slf4j;
 
+import java.sql.SQLException;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
+
 /** The context for fetch task that fetching data of snapshot split from MySQL data source. */
+@Slf4j
 public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
 
     private final SqlServerConnection dataConnection;
@@ -83,13 +88,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
     private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
 
     public SqlServerSourceFetchTaskContext(
-            JdbcSourceConfig sourceConfig,
-            JdbcDataSourceDialect dataSourceDialect,
-            SqlServerConnection dataConnection,
-            SqlServerConnection metadataConnection) {
+            JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
         super(sourceConfig, dataSourceDialect);
-        this.dataConnection = dataConnection;
-        this.metadataConnection = metadataConnection;
+
+        this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
+        this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
         this.metadataProvider = new SqlServerEventMetadataProvider();
     }
 
@@ -158,6 +161,16 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
         this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
     }
 
+    @Override
+    public void close() {
+        try {
+            this.dataConnection.close();
+            this.metadataConnection.close();
+        } catch (SQLException e) {
+            log.warn("Failed to close connection", e);
+        }
+    }
+
     @Override
     public SqlServerSourceConfig getSourceConfig() {
         return (SqlServerSourceConfig) sourceConfig;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
index a8246fa24..82d43971e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
@@ -166,6 +166,11 @@ public class SqlServerSnapshotFetchTask implements FetchTask<SourceSplitBase> {
         return taskRunning;
     }
 
+    @Override
+    public void shutdown() {
+        taskRunning = false;
+    }
+
     @Override
     public SourceSplitBase getSplit() {
         return split;
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
index 21a6af3a5..f6968b621 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
@@ -78,6 +78,11 @@ public class SqlServerTransactionLogFetchTask implements FetchTask<SourceSplitBa
         return taskRunning;
     }
 
+    @Override
+    public void shutdown() {
+        taskRunning = false;
+    }
+
     @Override
     public SourceSplitBase getSplit() {
         return split;