You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:40 UTC

[rocketmq-connect] 04/07: [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 598de5dbacd6e3c4c643f07bc64e0903447fd7b3
Author: jonnxu <jo...@163.com>
AuthorDate: Fri Jul 26 21:30:56 2019 +0800

    [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)
---
 .../org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java     | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index d4b39e0..1f7ed00 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -122,6 +122,7 @@ public class KafkaSourceTask extends SourceTask {
         log.info("source task stop enter");
         try {
             commitOffset(currentTPList, true);
+            consumer.wakeup(); // wakeup poll in other thread
             consumer.close();
         } catch (Exception e) {
             log.warn("{} consumer {} close exception {}", this, consumer, e);