You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/07 06:26:34 UTC

[4/8] flink git commit: [FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011

[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011

Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics
when closing FlinkKafkaProducer011. This was leading to resources leaking (for example
increasing number of active threads)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54f104bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54f104bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54f104bd

Branch: refs/heads/master
Commit: 54f104bd425ca709babd932dd494d33957df0b1d
Parents: a7ed135
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu May 3 15:50:53 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:27 2018 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java          | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54f104bd/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index e92f38b..0ae5e03b 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -651,6 +651,17 @@ public class FlinkKafkaProducer011<IN>
 		if (currentTransaction != null) {
 			// to avoid exceptions on aborting transactions with some pending records
 			flush(currentTransaction);
+
+			// normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus
+			// we need to close it manually
+			switch (semantic) {
+				case EXACTLY_ONCE:
+					break;
+				case AT_LEAST_ONCE:
+				case NONE:
+					currentTransaction.producer.close();
+					break;
+			}
 		}
 		try {
 			super.close();