You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/15 17:14:52 UTC
[flink] 03/04: [FLINK-17327] Always use close() with zero timeout
in exactly-once Kafka Producer
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7d4041250dfefeb919b5d2854e993434e8b68401
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Mon May 11 11:32:37 2020 +0200
[FLINK-17327] Always use close() with zero timeout in exactly-once Kafka Producer
NOTE: This fix does not work without also bumping the Kafka version to
sth > 2.4 (2.3 might work as well). Because of KAFKA-6635/KAFKA-7763.
Calling close() without timeout is equivalent to calling
close(Long.MAX_VALUE). This will leave lingering Kafka threads on
shutdown, which eventually cause the Task Manager to be killed by the
Flink Task watchdog.
We also forbid calling close() without a timeout on our internal
Producer, which means that we have to change some code that uses
try-with-resources, because this calls close() without a timeout.
We need to call close with a zero timeout to prevent in-flight
transactions from being aborted by the KafkaProducer/sender. This would
break how we use transactions in our Kafka Producer.
We don't update FlinkKafkaProducerBase, which is used for
non-exactly-once Kafka Producers.
---
.../connectors/kafka/FlinkKafkaProducer011.java | 7 ++-
.../connectors/kafka/FlinkKafkaProducer.java | 57 ++++++++++++++++------
.../kafka/internal/FlinkKafkaInternalProducer.java | 11 +----
.../kafka/FlinkKafkaInternalProducerITCase.java | 35 +++++++++----
4 files changed, 74 insertions(+), 36 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index f8642f9..ee99e56 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -83,6 +83,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -675,7 +676,8 @@ public class FlinkKafkaProducer011<IN>
break;
case AT_LEAST_ONCE:
case NONE:
- currentTransaction.producer.close();
+ currentTransaction.producer.flush();
+ currentTransaction.producer.close(0, TimeUnit.SECONDS);
break;
}
}
@@ -959,7 +961,8 @@ public class FlinkKafkaProducer011<IN>
private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
- producer.close();
+ producer.flush();
+ producer.close(0, TimeUnit.SECONDS);
}
private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index c2193e5..82a3fac 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartiti
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -71,6 +70,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -84,6 +84,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -877,7 +878,8 @@ public class FlinkKafkaProducer<IN>
break;
case AT_LEAST_ONCE:
case NONE:
- currentTransaction.producer.close();
+ currentTransaction.producer.flush();
+ currentTransaction.producer.close(Duration.ofSeconds(0));
break;
}
}
@@ -888,12 +890,20 @@ public class FlinkKafkaProducer<IN>
// We may have to close producer of the current transaction in case some exception was thrown before
// the normal close routine finishes.
if (currentTransaction() != null) {
- IOUtils.closeQuietly(currentTransaction().producer);
+ try {
+ currentTransaction().producer.close(Duration.ofSeconds(0));
+ } catch (Throwable t) {
+ LOG.warn("Error closing producer.", t);
+ }
}
// Make sure all the producers for pending transactions are closed.
- pendingTransactions().forEach(transaction ->
- IOUtils.closeQuietly(transaction.getValue().producer)
- );
+ pendingTransactions().forEach(transaction -> {
+ try {
+ transaction.getValue().producer.close(Duration.ofSeconds(0));
+ } catch (Throwable t) {
+ LOG.warn("Error closing producer.", t);
+ }
+ });
// make sure we propagate pending errors
checkErroneous();
}
@@ -950,9 +960,10 @@ public class FlinkKafkaProducer<IN>
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
- try (
- FlinkKafkaInternalProducer<byte[], byte[]> producer =
- initTransactionalProducer(transaction.transactionalId, false)) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+ try {
+ producer =
+ initTransactionalProducer(transaction.transactionalId, false);
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
@@ -961,6 +972,10 @@ public class FlinkKafkaProducer<IN>
"Presumably this transaction has been already committed before",
ex,
transaction);
+ } finally {
+ if (producer != null) {
+ producer.close(0, TimeUnit.SECONDS);
+ }
}
}
}
@@ -976,10 +991,15 @@ public class FlinkKafkaProducer<IN>
@Override
protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
- try (
- FlinkKafkaInternalProducer<byte[], byte[]> producer =
- initTransactionalProducer(transaction.transactionalId, false)) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+ try {
+ producer =
+ initTransactionalProducer(transaction.transactionalId, false);
producer.initTransactions();
+ } finally {
+ if (producer != null) {
+ producer.close(0, TimeUnit.SECONDS);
+ }
}
}
}
@@ -1146,10 +1166,16 @@ public class FlinkKafkaProducer<IN>
final Properties myConfig = new Properties();
myConfig.putAll(producerConfig);
initTransactionalProducerConfig(myConfig, transactionalId);
- try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
- new FlinkKafkaInternalProducer<>(myConfig)) {
+ FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null;
+ try {
+ kafkaProducer =
+ new FlinkKafkaInternalProducer<>(myConfig);
// it suffices to call initTransactions - this will abort any lingering transactions
kafkaProducer.initTransactions();
+ } finally {
+ if (kafkaProducer != null) {
+ kafkaProducer.close(Duration.ofSeconds(0));
+ }
}
}
});
@@ -1182,7 +1208,8 @@ public class FlinkKafkaProducer<IN>
private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
- producer.close();
+ producer.flush();
+ producer.close(Duration.ofSeconds(0));
}
private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index e4e7b5c..bfb965c 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -147,16 +147,7 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
@Override
public void close() {
- synchronized (producerClosingLock) {
- kafkaProducer.close();
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Closed internal KafkaProducer {}. Stacktrace: {}",
- System.identityHashCode(this),
- Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
- }
- closed = true;
- }
+ throw new UnsupportedOperationException("Close without timeout is now allowed because it can leave lingering Kafka threads.");
}
@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 5062b70..53fb781 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -33,6 +33,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
@@ -79,11 +80,15 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
@Test(timeout = 30000L)
public void testHappyPath() throws IOException {
String topicName = "flink-kafka-producer-happy-path";
- try (Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+
+ Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
kafkaProducer.commitTransaction();
+ } finally {
+ kafkaProducer.close(Duration.ofSeconds(5));
}
assertRecord(topicName, "42", "42");
deleteTestTopic(topicName);
@@ -92,7 +97,8 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
@Test(timeout = 30000L)
public void testResumeTransaction() throws IOException {
String topicName = "flink-kafka-producer-resume-transaction";
- try (FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
@@ -100,9 +106,12 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
long producerId = kafkaProducer.getProducerId();
short epoch = kafkaProducer.getEpoch();
- try (FlinkKafkaInternalProducer<String, String> resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+ FlinkKafkaInternalProducer<String, String> resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ try {
resumeProducer.resumeTransaction(producerId, epoch);
resumeProducer.commitTransaction();
+ } finally {
+ resumeProducer.close(Duration.ofSeconds(5));
}
assertRecord(topicName, "42", "42");
@@ -111,10 +120,15 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
kafkaProducer.commitTransaction();
// this shouldn't fail also, for same reason as above
- try (FlinkKafkaInternalProducer<String, String> resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+ resumeProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ try {
resumeProducer.resumeTransaction(producerId, epoch);
resumeProducer.commitTransaction();
+ } finally {
+ resumeProducer.close(Duration.ofSeconds(5));
}
+ } finally {
+ kafkaProducer.close(Duration.ofSeconds(5));
}
deleteTestTopic(topicName);
}
@@ -122,14 +136,14 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
@Test(timeout = 30000L, expected = IllegalStateException.class)
public void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
- kafkaProducer.close();
+ kafkaProducer.close(Duration.ofSeconds(5));
kafkaProducer.partitionsFor("Topic");
}
@Test(timeout = 30000L, expected = IllegalStateException.class)
public void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
- kafkaProducer.close();
+ kafkaProducer.close(Duration.ofSeconds(5));
kafkaProducer.initTransactions();
}
@@ -137,7 +151,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
public void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
- kafkaProducer.close();
+ kafkaProducer.close(Duration.ofSeconds(5));
kafkaProducer.beginTransaction();
}
@@ -174,12 +188,15 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
String topic = "flink-kafka-producer-txn-coordinator-changed";
createTestTopic(topic, 1, 2);
- try (Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+ Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ try {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
kafkaProducer.flush();
kafkaProducer.commitTransaction();
+ } finally {
+ kafkaProducer.close(Duration.ofSeconds(5));
}
deleteTestTopic(topic);
}
@@ -189,7 +206,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
- kafkaProducer.close();
+ kafkaProducer.close(Duration.ofSeconds(5));
return kafkaProducer;
}