You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:31 UTC
[rocketmq-flink] 18/33: Producer failed to shutdown when exception
happened (#388)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 0698d86fb318da566c0744ba5737e416a7536dc1
Author: shangan <ch...@163.com>
AuthorDate: Mon Aug 19 14:41:39 2019 +0800
Producer failed to shutdown when exception happened (#388)
---
src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index ca6848d..e8f237f 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -181,7 +181,12 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
@Override
public void close() throws Exception {
if (producer != null) {
- flushSync();
+ try {
+ flushSync();
+ } catch (Exception e) {
+ LOG.error("FlushSync failure!", e);
+ }
+ // make sure producer can be shutdown, thus current producerGroup will be unregistered
producer.shutdown();
}
}