You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/12 14:27:56 UTC

[flink] branch master updated: [FLINK-24773][kafka] Fail job if unhandled exception occurs during committing

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f28bb89  [FLINK-24773][kafka] Fail job if unhandled exception occurs during committing
f28bb89 is described below

commit f28bb89f3520875cb79b2a5ac8fbaab06ca2a547
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Thu Nov 4 16:07:58 2021 +0100

    [FLINK-24773][kafka] Fail job if unhandled exception occurs during committing
    
    Unhandled exceptions during committing usually imply that data is lost.
    We should not only log and continue processing but fail the job to give
    the user the choice what should be done.
---
 .../flink/connector/kafka/sink/KafkaCommitter.java | 10 +++++++++
 .../connector/kafka/sink/KafkaCommitterTest.java   | 25 +++++++++++++++-------
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index b511126..f0b1d24 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
@@ -36,6 +37,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+
 /**
  * Committer implementation for {@link KafkaSink}
  *
@@ -59,6 +62,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
     @Override
     public List<KafkaCommittable> commit(List<KafkaCommittable> committables) throws IOException {
         List<KafkaCommittable> retryableCommittables = new ArrayList<>();
+        Exception collected = null;
         for (KafkaCommittable committable : committables) {
             final String transactionalId = committable.getTransactionalId();
             LOG.debug("Committing Kafka transaction {}", transactionalId);
@@ -109,8 +113,14 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
                         "Transaction ({}) encountered error and data has been potentially lost.",
                         committable,
                         e);
+                collected = firstOrSuppressed(e, collected);
             }
             recyclable.ifPresent(Recyclable::close);
+            if (collected != null) {
+                throw new FlinkRuntimeException(
+                        "Some committables were not committed and committing failed with:",
+                        collected);
+            }
         }
         return retryableCommittables;
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 6ce4f24..4bf9b96 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -17,25 +17,30 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Tests for {@link KafkaCommitter}. */
-public class KafkaCommitterTest extends TestLogger {
+@ExtendWith({TestLoggerExtension.class})
+public class KafkaCommitterTest {
 
     private static final int PRODUCER_ID = 0;
     private static final short EPOCH = 0;
@@ -61,19 +66,23 @@ public class KafkaCommitterTest extends TestLogger {
     }
 
     @Test
-    public void testRetryCommittableOnFatalError() throws IOException {
+    public void testFailJobOnUnknownFatalError() {
         Properties properties = getProperties();
         try (final KafkaCommitter committer = new KafkaCommitter(properties);
                 FlinkKafkaInternalProducer<Object, Object> producer =
-                        new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID);
+                        new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID);
                 Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
                         new Recyclable<>(producer, p -> {})) {
             final List<KafkaCommittable> committables =
                     Collections.singletonList(
                             new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
             // will fail because transaction not started
-            List<KafkaCommittable> recovered = committer.commit(committables);
-            assertThat(recovered, empty());
+            final FlinkRuntimeException exception =
+                    assertThrows(FlinkRuntimeException.class, () -> committer.commit(committables));
+            assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
+            assertThat(
+                    exception.getCause().getMessage(),
+                    containsString("Transaction was not started"));
             assertThat(recyclable.isRecycled(), equalTo(true));
         }
     }