You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/07 06:26:34 UTC
[4/8] flink git commit: [FLINK-9287][kafka] Properly clean up
resources in non EXACTLY_ONCE FlinkKafkaProducer011
[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011
Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics
when closing FlinkKafkaProducer011. This was leading to resources leaking (for example
increasing number of active threads)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54f104bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54f104bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54f104bd
Branch: refs/heads/master
Commit: 54f104bd425ca709babd932dd494d33957df0b1d
Parents: a7ed135
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu May 3 15:50:53 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:27 2018 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer011.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/54f104bd/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index e92f38b..0ae5e03b 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -651,6 +651,17 @@ public class FlinkKafkaProducer011<IN>
if (currentTransaction != null) {
// to avoid exceptions on aborting transactions with some pending records
flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ currentTransaction.producer.close();
+ break;
+ }
}
try {
super.close();