You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/07/18 08:43:29 UTC

[flink] branch release-1.15 updated: [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new adbf09fb941 [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory
adbf09fb941 is described below

commit adbf09fb941c8f793df6d322ed95df87bc4254f3
Author: Charles Tan <ct...@gmail.com>
AuthorDate: Thu Jul 14 00:44:45 2022 -0700

    [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory
---
 .../flink/connector/kafka/sink/KafkaCommitter.java |  5 ++++
 .../connector/kafka/sink/KafkaCommitterTest.java   | 27 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index d2873dde40b..4dbeaf9e715 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -72,6 +72,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
                                 .orElseGet(() -> getRecoveryProducer(committable));
                 producer.commitTransaction();
                 producer.flush();
+                recyclable.ifPresent(Recyclable::close);
             } catch (RetriableException e) {
                 LOG.warn(
                         "Encountered retriable exception while committing {}.", transactionalId, e);
@@ -90,6 +91,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
                         ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                         kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
                         e);
+                recyclable.ifPresent(Recyclable::close);
                 request.signalFailedWithKnownReason(e);
             } catch (InvalidTxnStateException e) {
                 // This exception only occurs when aborting after a commit or vice versa.
@@ -99,12 +101,15 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
                                 + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
                         request,
                         e);
+                recyclable.ifPresent(Recyclable::close);
                 request.signalFailedWithKnownReason(e);
             } catch (UnknownProducerIdException e) {
                 LOG.error(
                         "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
                         request,
                         e);
+                recyclable.ifPresent(Recyclable::close);
+                request.signalFailedWithKnownReason(e);
             } catch (Exception e) {
                 LOG.error(
                         "Transaction ({}) encountered error and data has been potentially lost.",
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 30bbdca7889..8def81a38cf 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -85,6 +86,32 @@ public class KafkaCommitterTest {
         }
     }
 
+    @Test
+    public void testKafkaCommitterClosesProducer() throws IOException, InterruptedException {
+        Properties properties = getProperties();
+        FlinkKafkaInternalProducer<Object, Object> producer =
+                new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID) {
+                    @Override
+                    public void commitTransaction() throws ProducerFencedException {}
+
+                    @Override
+                    public void flush() {}
+
+                    @Override
+                    public void close() {}
+                };
+        try (final KafkaCommitter committer = new KafkaCommitter(properties);
+                Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
+                        new Recyclable<>(producer, p -> {})) {
+            final MockCommitRequest<KafkaCommittable> request =
+                    new MockCommitRequest<>(
+                            new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
+
+            committer.commit(Collections.singletonList(request));
+            assertThat(recyclable.isRecycled()).isTrue();
+        }
+    }
+
     Properties getProperties() {
         Properties properties = new Properties();
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:1");