You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/15 17:14:52 UTC

[flink] 03/04: [FLINK-17327] Always use close() with zero timeout in exactly-once Kafka Producer

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d4041250dfefeb919b5d2854e993434e8b68401
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Mon May 11 11:32:37 2020 +0200

    [FLINK-17327] Always use close() with zero timeout in exactly-once Kafka Producer
    
    NOTE: This fix does not work without also bumping the Kafka version to
    sth > 2.4 (2.3 might work as well). Because of KAFKA-6635/KAFKA-7763.
    
    Calling close() without timeout is equivalent to calling
    close(Long.MAX_VALUE). This will leave lingering Kafka threads on
    shutdown, which eventually cause the Task Manager to be killed by the
    Flink Task watchdog.
    
    We also forbid calling close() without a timeout on our internal
    Producer, which means that we have to change some code that uses
    try-with-resources, because this calls close() without a timeout.
    
    We need to call close with a zero timeout to prevent in-flight
    transactions from being aborted by the KafkaProducer/sender. This would
    break how we use transactions in our Kafka Producer.
    
    We don't update FlinkKafkaProducerBase, which is used for
    non-exactly-once Kafka Producers.
---
 .../connectors/kafka/FlinkKafkaProducer011.java    |  7 ++-
 .../connectors/kafka/FlinkKafkaProducer.java       | 57 ++++++++++++++++------
 .../kafka/internal/FlinkKafkaInternalProducer.java | 11 +----
 .../kafka/FlinkKafkaInternalProducerITCase.java    | 35 +++++++++----
 4 files changed, 74 insertions(+), 36 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index f8642f9..ee99e56 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -83,6 +83,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -675,7 +676,8 @@ public class FlinkKafkaProducer011<IN>
 						break;
 					case AT_LEAST_ONCE:
 					case NONE:
-						currentTransaction.producer.close();
+						currentTransaction.producer.flush();
+						currentTransaction.producer.close(0, TimeUnit.SECONDS);
 						break;
 				}
 			}
@@ -959,7 +961,8 @@ public class FlinkKafkaProducer011<IN>
 
 	private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
 		availableTransactionalIds.add(producer.getTransactionalId());
-		producer.close();
+		producer.flush();
+		producer.close(0, TimeUnit.SECONDS);
 	}
 
 	private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index c2193e5..82a3fac 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartiti
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
@@ -71,6 +70,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -84,6 +84,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -877,7 +878,8 @@ public class FlinkKafkaProducer<IN>
 						break;
 					case AT_LEAST_ONCE:
 					case NONE:
-						currentTransaction.producer.close();
+						currentTransaction.producer.flush();
+						currentTransaction.producer.close(Duration.ofSeconds(0));
 						break;
 				}
 			}
@@ -888,12 +890,20 @@ public class FlinkKafkaProducer<IN>
 			// We may have to close producer of the current transaction in case some exception was thrown before
 			// the normal close routine finishes.
 			if (currentTransaction() != null) {
-				IOUtils.closeQuietly(currentTransaction().producer);
+				try {
+					currentTransaction().producer.close(Duration.ofSeconds(0));
+				} catch (Throwable t) {
+					LOG.warn("Error closing producer.", t);
+				}
 			}
 			// Make sure all the producers for pending transactions are closed.
-			pendingTransactions().forEach(transaction ->
-					IOUtils.closeQuietly(transaction.getValue().producer)
-			);
+			pendingTransactions().forEach(transaction -> {
+						try {
+							transaction.getValue().producer.close(Duration.ofSeconds(0));
+						} catch (Throwable t) {
+							LOG.warn("Error closing producer.", t);
+						}
+					});
 			// make sure we propagate pending errors
 			checkErroneous();
 		}
@@ -950,9 +960,10 @@ public class FlinkKafkaProducer<IN>
 	@Override
 	protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
 		if (transaction.isTransactional()) {
-			try (
-				FlinkKafkaInternalProducer<byte[], byte[]> producer =
-					initTransactionalProducer(transaction.transactionalId, false)) {
+			FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+			try {
+				producer =
+					initTransactionalProducer(transaction.transactionalId, false);
 				producer.resumeTransaction(transaction.producerId, transaction.epoch);
 				producer.commitTransaction();
 			} catch (InvalidTxnStateException | ProducerFencedException ex) {
@@ -961,6 +972,10 @@ public class FlinkKafkaProducer<IN>
 						"Presumably this transaction has been already committed before",
 					ex,
 					transaction);
+			} finally {
+				if (producer != null) {
+					producer.close(0, TimeUnit.SECONDS);
+				}
 			}
 		}
 	}
@@ -976,10 +991,15 @@ public class FlinkKafkaProducer<IN>
 	@Override
 	protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction) {
 		if (transaction.isTransactional()) {
-			try (
-				FlinkKafkaInternalProducer<byte[], byte[]> producer =
-					initTransactionalProducer(transaction.transactionalId, false)) {
+			FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+			try {
+				producer =
+						initTransactionalProducer(transaction.transactionalId, false);
 				producer.initTransactions();
+			} finally {
+				if (producer != null) {
+					producer.close(0, TimeUnit.SECONDS);
+				}
 			}
 		}
 	}
@@ -1146,10 +1166,16 @@ public class FlinkKafkaProducer<IN>
 				final Properties myConfig = new Properties();
 				myConfig.putAll(producerConfig);
 				initTransactionalProducerConfig(myConfig, transactionalId);
-				try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
-						new FlinkKafkaInternalProducer<>(myConfig)) {
+				FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null;
+				try {
+					kafkaProducer =
+							new FlinkKafkaInternalProducer<>(myConfig);
 					// it suffices to call initTransactions - this will abort any lingering transactions
 					kafkaProducer.initTransactions();
+				} finally {
+					if (kafkaProducer != null) {
+						kafkaProducer.close(Duration.ofSeconds(0));
+					}
 				}
 			}
 		});
@@ -1182,7 +1208,8 @@ public class FlinkKafkaProducer<IN>
 
 	private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
 		availableTransactionalIds.add(producer.getTransactionalId());
-		producer.close();
+		producer.flush();
+		producer.close(Duration.ofSeconds(0));
 	}
 
 	private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index e4e7b5c..bfb965c 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -147,16 +147,7 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public void close() {
-		synchronized (producerClosingLock) {
-			kafkaProducer.close();
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(
-						"Closed internal KafkaProducer {}. Stacktrace: {}",
-						System.identityHashCode(this),
-						Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
-			}
-			closed = true;
-		}
+		throw new UnsupportedOperationException("Close without timeout is now allowed because it can leave lingering Kafka threads.");
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 5062b70..53fb781 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -33,6 +33,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.UUID;
@@ -79,11 +80,15 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 	@Test(timeout = 30000L)
 	public void testHappyPath() throws IOException {
 		String topicName = "flink-kafka-producer-happy-path";
-		try (Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+
+		Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		try {
 			kafkaProducer.initTransactions();
 			kafkaProducer.beginTransaction();
 			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
 			kafkaProducer.commitTransaction();
+		} finally {
+			kafkaProducer.close(Duration.ofSeconds(5));
 		}
 		assertRecord(topicName, "42", "42");
 		deleteTestTopic(topicName);
@@ -92,7 +97,8 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 	@Test(timeout = 30000L)
 	public void testResumeTransaction() throws IOException {
 		String topicName = "flink-kafka-producer-resume-transaction";
-		try (FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		try {
 			kafkaProducer.initTransactions();
 			kafkaProducer.beginTransaction();
 			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
@@ -100,9 +106,12 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 			long producerId = kafkaProducer.getProducerId();
 			short epoch = kafkaProducer.getEpoch();
 
-			try (FlinkKafkaInternalProducer<String, String> resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+			FlinkKafkaInternalProducer<String, String> resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+			try {
 				resumeProducer.resumeTransaction(producerId, epoch);
 				resumeProducer.commitTransaction();
+			} finally {
+				resumeProducer.close(Duration.ofSeconds(5));
 			}
 
 			assertRecord(topicName, "42", "42");
@@ -111,10 +120,15 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 			kafkaProducer.commitTransaction();
 
 			// this shouldn't fail also, for same reason as above
-			try (FlinkKafkaInternalProducer<String, String> resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+			resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+			try {
 				resumeProducer.resumeTransaction(producerId, epoch);
 				resumeProducer.commitTransaction();
+			} finally {
+				resumeProducer.close(Duration.ofSeconds(5));
 			}
+		} finally {
+			kafkaProducer.close(Duration.ofSeconds(5));
 		}
 		deleteTestTopic(topicName);
 	}
@@ -122,14 +136,14 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 	@Test(timeout = 30000L, expected = IllegalStateException.class)
 	public void testPartitionsForAfterClosed() {
 		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
-		kafkaProducer.close();
+		kafkaProducer.close(Duration.ofSeconds(5));
 		kafkaProducer.partitionsFor("Topic");
 	}
 
 	@Test(timeout = 30000L, expected = IllegalStateException.class)
 	public void testInitTransactionsAfterClosed() {
 		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
-		kafkaProducer.close();
+		kafkaProducer.close(Duration.ofSeconds(5));
 		kafkaProducer.initTransactions();
 	}
 
@@ -137,7 +151,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 	public void testBeginTransactionAfterClosed() {
 		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
 		kafkaProducer.initTransactions();
-		kafkaProducer.close();
+		kafkaProducer.close(Duration.ofSeconds(5));
 		kafkaProducer.beginTransaction();
 	}
 
@@ -174,12 +188,15 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 	public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
 		String topic = "flink-kafka-producer-txn-coordinator-changed";
 		createTestTopic(topic, 1, 2);
-		try (Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+		Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		try {
 			kafkaProducer.initTransactions();
 			kafkaProducer.beginTransaction();
 			restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
 			kafkaProducer.flush();
 			kafkaProducer.commitTransaction();
+		} finally {
+			kafkaProducer.close(Duration.ofSeconds(5));
 		}
 		deleteTestTopic(topic);
 	}
@@ -189,7 +206,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 		kafkaProducer.initTransactions();
 		kafkaProducer.beginTransaction();
 		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
-		kafkaProducer.close();
+		kafkaProducer.close(Duration.ofSeconds(5));
 		return kafkaProducer;
 	}