You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2021/12/10 09:02:23 UTC

[flink] branch release-1.14 updated: [FLINK-25126][kafka] Reset internal transaction state of FlinkKafkaInternalProducer if transaction finalization fails

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new dccd7f0  [FLINK-25126][kafka] Reset internal transaction state of FlinkKafkaInternalProducer if transaction finalization fails
dccd7f0 is described below

commit dccd7f08cde2b13ba4549c94ebbc04ff2c0c5152
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Fri Dec 3 15:07:05 2021 +0100

    [FLINK-25126][kafka] Reset internal transaction state of FlinkKafkaInternalProducer if transaction finalization fails
    
    In the KafkaCommitter we retry transactions if they failed during
    committing. Since we reuse the KafkaProducers we update the used
    transactionalId to continue committing other transactions. To prevent
    accidental overwrites we track the transaction state inside the
    FlinkKafkaInternalProducer.
    Before this change, the state was not reset on a failures during the
    transaction finalization and setting a new transactionalId failed.
    The state is now always reset nevertheless whether finalizing the
    transaction fails (commit, abort).
---
 .../kafka/sink/FlinkKafkaInternalProducer.java     |  8 ++-
 .../sink/FlinkKafkaInternalProducerITCase.java     | 69 +++++++++++++++++-----
 2 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index aec1edf..19eed71 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -86,16 +86,16 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
     public void abortTransaction() throws ProducerFencedException {
         LOG.debug("abortTransaction {}", transactionalId);
         checkState(inTransaction, "Transaction was not started");
-        super.abortTransaction();
         inTransaction = false;
+        super.abortTransaction();
     }
 
     @Override
     public void commitTransaction() throws ProducerFencedException {
         LOG.debug("commitTransaction {}", transactionalId);
         checkState(inTransaction, "Transaction was not started");
-        super.commitTransaction();
         inTransaction = false;
+        super.commitTransaction();
     }
 
     public boolean isInTransaction() {
@@ -159,7 +159,9 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
 
     public void setTransactionId(String transactionalId) {
         if (!transactionalId.equals(this.transactionalId)) {
-            checkState(!inTransaction);
+            checkState(
+                    !inTransaction,
+                    String.format("Another transaction %s is still open.", transactionalId));
             LOG.debug("Change transaction id from {} to {}", this.transactionalId, transactionalId);
             Object transactionManager = getTransactionManager();
             synchronized (transactionManager) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index 0a68433..cf07311 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -26,9 +28,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
@@ -38,6 +43,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.kafka.sink.KafkaUtil.createKafkaContainer;
@@ -45,11 +51,11 @@ import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 @Testcontainers
 class FlinkKafkaInternalProducerITCase extends TestLogger {
 
-    private static final String TEST_TOPIC = "test-topic";
     private static final Logger LOG =
             LoggerFactory.getLogger(FlinkKafkaInternalProducerITCase.class);
 
@@ -59,28 +65,16 @@ class FlinkKafkaInternalProducerITCase extends TestLogger {
 
     private static final String TRANSACTION_PREFIX = "test-transaction-";
 
-    Properties getProperties() {
-        Properties properties = new Properties();
-        properties.put(
-                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
-                KAFKA_CONTAINER.getBootstrapServers());
-        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
-        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        return properties;
-    }
-
     @Test
     void testInitTransactionId() {
+        final String topic = "test-init-transactions";
         try (FlinkKafkaInternalProducer<String, String> reuse =
                 new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
             int numTransactions = 20;
             for (int i = 1; i <= numTransactions; i++) {
                 reuse.initTransactionId(TRANSACTION_PREFIX + i);
                 reuse.beginTransaction();
-                reuse.send(new ProducerRecord<>(TEST_TOPIC, "test-value-" + i));
+                reuse.send(new ProducerRecord<>(topic, "test-value-" + i));
                 if (i % 2 == 0) {
                     reuse.commitTransaction();
                 } else {
@@ -88,11 +82,54 @@ class FlinkKafkaInternalProducerITCase extends TestLogger {
                     reuse.abortTransaction();
                 }
                 assertNumTransactions(i);
-                assertThat(readRecords(TEST_TOPIC).count(), equalTo(i / 2));
+                assertThat(readRecords(topic).count(), equalTo(i / 2));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideTransactionsFinalizer")
+    void testResetInnerTransactionIfFinalizingTransactionFailed(
+            Consumer<FlinkKafkaInternalProducer<?, ?>> transactionFinalizer) {
+        final String topic = "reset-producer-internal-state";
+        try (FlinkKafkaInternalProducer<String, String> fenced =
+                new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
+            fenced.initTransactions();
+            fenced.beginTransaction();
+            fenced.send(new ProducerRecord<>(topic, "test-value"));
+            // Start a second producer that fences the first one
+            try (FlinkKafkaInternalProducer<String, String> producer =
+                    new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
+                producer.initTransactions();
+                producer.beginTransaction();
+                producer.send(new ProducerRecord<>(topic, "test-value"));
+                producer.commitTransaction();
             }
+            assertThrows(ProducerFencedException.class, () -> transactionFinalizer.accept(fenced));
+            // Internal transaction should be reset and setting a new transactional id is possible
+            fenced.setTransactionId("dummy2");
         }
     }
 
+    private static Properties getProperties() {
+        Properties properties = new Properties();
+        properties.put(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                KAFKA_CONTAINER.getBootstrapServers());
+        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return properties;
+    }
+
+    private static List<Consumer<FlinkKafkaInternalProducer<?, ?>>> provideTransactionsFinalizer() {
+        return Lists.newArrayList(
+                FlinkKafkaInternalProducer::commitTransaction,
+                FlinkKafkaInternalProducer::abortTransaction);
+    }
+
     private void assertNumTransactions(int numTransactions) {
         List<KafkaTransactionLog.TransactionRecord> transactions =
                 new KafkaTransactionLog(getProperties())