You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:47 UTC

[rocketmq-connect] 39/43: [ISSUE #558] An ugly solution for fetch topic list error

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 1a49e60c049417dbc24df8e7f6b9fb5ac7ba0b71
Author: affe <af...@gmail.com>
AuthorDate: Mon Jul 27 20:28:21 2020 +0800

    [ISSUE #558] An ugly solution for fetch topic list error
---
 .../apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index 6a41646..53379ec 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -11,6 +11,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -43,6 +44,7 @@ public class JdbcSinkConnector extends SinkConnector {
 
     private volatile boolean adminStarted;
 
+    private ScheduledFuture<?> listenerHandle;
     public JdbcSinkConnector() {
         topicRouteMap = new HashMap<>();
         dbConnectorConfig = new SinkDbConnectorConfig();
@@ -94,7 +96,7 @@ public class JdbcSinkConnector extends SinkConnector {
     }
 
     public void startListener() {
-        executor.scheduleAtFixedRate(new Runnable() {
+        listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
             boolean first = true;
             HashMap<String, Set<TaskTopicInfo>> origin = null;
 
@@ -169,9 +171,10 @@ public class JdbcSinkConnector extends SinkConnector {
         }
     }
 
+
     @Override
     public void stop() {
-
+        listenerHandle.cancel(true);
     }
 
     @Override