You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:40 UTC
[rocketmq-connect] 04/07: [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 598de5dbacd6e3c4c643f07bc64e0903447fd7b3
Author: jonnxu <jo...@163.com>
AuthorDate: Fri Jul 26 21:30:56 2019 +0800
[ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)
---
.../org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index d4b39e0..1f7ed00 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -122,6 +122,7 @@ public class KafkaSourceTask extends SourceTask {
log.info("source task stop enter");
try {
commitOffset(currentTPList, true);
+ consumer.wakeup(); // wakeup poll in other thread
consumer.close();
} catch (Exception e) {
log.warn("{} consumer {} close exception {}", this, consumer, e);