You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:31 UTC

[rocketmq-flink] 18/33: Producer failed to shutdown when exception happened (#388)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 0698d86fb318da566c0744ba5737e416a7536dc1
Author: shangan <ch...@163.com>
AuthorDate: Mon Aug 19 14:41:39 2019 +0800

    Producer failed to shutdown when exception happened (#388)
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index ca6848d..e8f237f 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -181,7 +181,12 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
     @Override
     public void close() throws Exception {
         if (producer != null) {
-            flushSync();
+            try {
+                flushSync();
+            } catch (Exception e) {
+                LOG.error("FlushSync failure!", e);
+            }
+            // make sure producer can be shutdown, thus current producerGroup will be unregistered
             producer.shutdown();
         }
     }