You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/07/18 08:43:29 UTC
[flink] branch release-1.15 updated: [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory
This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new adbf09fb941 [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory
adbf09fb941 is described below
commit adbf09fb941c8f793df6d322ed95df87bc4254f3
Author: Charles Tan <ct...@gmail.com>
AuthorDate: Thu Jul 14 00:44:45 2022 -0700
[FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory
---
.../flink/connector/kafka/sink/KafkaCommitter.java | 5 ++++
.../connector/kafka/sink/KafkaCommitterTest.java | 27 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index d2873dde40b..4dbeaf9e715 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -72,6 +72,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
.orElseGet(() -> getRecoveryProducer(committable));
producer.commitTransaction();
producer.flush();
+ recyclable.ifPresent(Recyclable::close);
} catch (RetriableException e) {
LOG.warn(
"Encountered retriable exception while committing {}.", transactionalId, e);
@@ -90,6 +91,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
e);
+ recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (InvalidTxnStateException e) {
// This exception only occurs when aborting after a commit or vice versa.
@@ -99,12 +101,15 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
request,
e);
+ recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (UnknownProducerIdException e) {
LOG.error(
"Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
request,
e);
+ recyclable.ifPresent(Recyclable::close);
+ request.signalFailedWithKnownReason(e);
} catch (Exception e) {
LOG.error(
"Transaction ({}) encountered error and data has been potentially lost.",
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 30bbdca7889..8def81a38cf 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.TestLoggerExtension;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -85,6 +86,32 @@ public class KafkaCommitterTest {
}
}
+ @Test
+ public void testKafkaCommitterClosesProducer() throws IOException, InterruptedException {
+ Properties properties = getProperties();
+ FlinkKafkaInternalProducer<Object, Object> producer =
+ new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID) {
+ @Override
+ public void commitTransaction() throws ProducerFencedException {}
+
+ @Override
+ public void flush() {}
+
+ @Override
+ public void close() {}
+ };
+ try (final KafkaCommitter committer = new KafkaCommitter(properties);
+ Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
+ new Recyclable<>(producer, p -> {})) {
+ final MockCommitRequest<KafkaCommittable> request =
+ new MockCommitRequest<>(
+ new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
+
+ committer.commit(Collections.singletonList(request));
+ assertThat(recyclable.isRecycled()).isTrue();
+ }
+ }
+
Properties getProperties() {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:1");