You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:47 UTC
[rocketmq-connect] 39/43: [ISSUE #558] An ugly solution for fetch topic list error
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 1a49e60c049417dbc24df8e7f6b9fb5ac7ba0b71
Author: affe <af...@gmail.com>
AuthorDate: Mon Jul 27 20:28:21 2020 +0800
[ISSUE #558] An ugly solution for fetch topic list error
---
.../apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index 6a41646..53379ec 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -11,6 +11,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -43,6 +44,7 @@ public class JdbcSinkConnector extends SinkConnector {
private volatile boolean adminStarted;
+ private ScheduledFuture<?> listenerHandle;
public JdbcSinkConnector() {
topicRouteMap = new HashMap<>();
dbConnectorConfig = new SinkDbConnectorConfig();
@@ -94,7 +96,7 @@ public class JdbcSinkConnector extends SinkConnector {
}
public void startListener() {
- executor.scheduleAtFixedRate(new Runnable() {
+ listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
boolean first = true;
HashMap<String, Set<TaskTopicInfo>> origin = null;
@@ -169,9 +171,10 @@ public class JdbcSinkConnector extends SinkConnector {
}
}
+
@Override
public void stop() {
-
+ listenerHandle.cancel(true);
}
@Override