You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/08/01 07:08:48 UTC

[flink] 02/03: [FLINK-13226][connector/kafka] Fix deadlock between producer closure and transaction commit in the universal Kafka connector.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc3184a35f2430c94535a095c2f926e912f692bf
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Tue Jul 30 11:52:38 2019 +0200

    [FLINK-13226][connector/kafka] Fix deadlock between producer closure and transaction commit in the universal Kafka connector.
    
    This patch fixes a race condition between the checkpointing thread and main thread.
    The sequence causing the deadlock is the following:
    
        1. In FlinkKafkaProducer, the main thread encounters a problem and closes all the producer
           to start failover.
        2. The previous checkpoint has completed, so the checkpointing thread grabs the checkpoint
           lock and tries to commit the transaction on the producer that has been closed in step 1.
           This commit will never succeed due to KAFKA-6635. So the checkpoint thread blocks forever.
        3. In StreamTask, the main thread will eventually try to release all the record writer.
           To do that, it attempts to grab the checkpoint lock which is hold by checkpoint thread in
           step 2 and will never be released. So the main thread also blocks forever.
    
    KAFKA-6635 has been fixed in Kafka 2.3.0. But Flink 1.9 does not rely on that yet, So we are just
    going to fix on the Flink side first. The solution is to make sure that in FlinkKafkaProducer any
    operation relying on the underlying sender thread to finish throws an exception if the producer
    is closed.
---
 .../kafka/internal/FlinkKafkaInternalProducer.java | 116 ++++++++++++++++-----
 .../kafka/FlinkKafkaInternalProducerITCase.java    |  60 +++++++++++
 2 files changed, 148 insertions(+), 28 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 916bfc7..78bbb53 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -61,39 +61,63 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	protected final KafkaProducer<K, V> kafkaProducer;
 
+	// This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+	// Kafka 2.3.0, we need this workaround before Kafka dependency is bumped to 2.3.0 to avoid deadlock
+	// between a transaction committing / aborting thread and a producer closing thread.
+	// TODO: remove the workaround after Kafka dependency is bumped to 2.3.0+
+	private final Object producerClosingLock;
+	private volatile boolean closed;
+
 	@Nullable
 	protected final String transactionalId;
 
 	public FlinkKafkaInternalProducer(Properties properties) {
 		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		kafkaProducer = new KafkaProducer<>(properties);
+		producerClosingLock = new Object();
+		closed = false;
 	}
 
 	// -------------------------------- Simple proxy method calls --------------------------------
 
 	@Override
 	public void initTransactions() {
-		kafkaProducer.initTransactions();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.initTransactions();
+		}
 	}
 
 	@Override
 	public void beginTransaction() throws ProducerFencedException {
-		kafkaProducer.beginTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.beginTransaction();
+		}
 	}
 
 	@Override
 	public void commitTransaction() throws ProducerFencedException {
-		kafkaProducer.commitTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.commitTransaction();
+		}
 	}
 
 	@Override
 	public void abortTransaction() throws ProducerFencedException {
-		kafkaProducer.abortTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.abortTransaction();
+		}
 	}
 
 	@Override
 	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
-		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		}
 	}
 
 	@Override
@@ -108,7 +132,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public List<PartitionInfo> partitionsFor(String topic) {
-		return kafkaProducer.partitionsFor(topic);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			return kafkaProducer.partitionsFor(topic);
+		}
 	}
 
 	@Override
@@ -118,17 +145,26 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public void close() {
-		kafkaProducer.close();
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close();
+		}
 	}
 
 	@Override
 	public void close(long timeout, TimeUnit unit) {
-		kafkaProducer.close(timeout, unit);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(timeout, unit);
+		}
 	}
 
 	@Override
 	public void close(Duration duration) {
-		kafkaProducer.close(duration);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(duration);
+		}
 	}
 
 	// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -137,7 +173,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	public void flush() {
 		kafkaProducer.flush();
 		if (transactionalId != null) {
-			flushNewPartitions();
+			synchronized (producerClosingLock) {
+				ensureNotClosed();
+				flushNewPartitions();
+			}
 		}
 	}
 
@@ -148,24 +187,39 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	 * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
 	 */
 	public void resumeTransaction(long producerId, short epoch) {
-		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
-		LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
-		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		synchronized (transactionManager) {
-			Object nextSequence = getValue(transactionManager, "nextSequence");
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-			invoke(nextSequence, "clear");
-
-			Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
-			setValue(producerIdAndEpoch, "producerId", producerId);
-			setValue(producerIdAndEpoch, "epoch", epoch);
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-			setValue(transactionManager, "transactionStarted", true);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			Preconditions.checkState(producerId >= 0 && epoch >= 0,
+				"Incorrect values for producerId %s and epoch %s",
+				producerId,
+				epoch);
+			LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+				transactionalId,
+				producerId,
+				epoch);
+
+			Object transactionManager = getValue(kafkaProducer, "transactionManager");
+			synchronized (transactionManager) {
+				Object nextSequence = getValue(transactionManager, "nextSequence");
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+				invoke(nextSequence, "clear");
+
+				Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+				setValue(producerIdAndEpoch, "producerId", producerId);
+				setValue(producerIdAndEpoch, "epoch", epoch);
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+				setValue(transactionManager, "transactionStarted", true);
+			}
 		}
 	}
 
@@ -192,6 +246,12 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 		return node.id();
 	}
 
+	private void ensureNotClosed() {
+		if (closed) {
+			throw new IllegalStateException("The producer has already been closed");
+		}
+	}
+
 	/**
 	 * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
 	 * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index d35af10..2d749ba 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -101,6 +101,66 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 		deleteTestTopic(topicName);
 	}
 
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testPartitionsForAfterClosed() {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.close();
+		kafkaProducer.partitionsFor("Topic");
+	}
+
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testInitTransactionsAfterClosed() {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.close();
+		kafkaProducer.initTransactions();
+	}
+
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testBeginTransactionAfterClosed() {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.close();
+		kafkaProducer.beginTransaction();
+	}
+
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testCommitTransactionAfterClosed() {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
+		kafkaProducer.commitTransaction();
+	}
+
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testResumeTransactionAfterClosed() {
+		String topicName = "testAbortTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
+		kafkaProducer.resumeTransaction(0L, (short) 1);
+	}
+
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testAbortTransactionAfterClosed() {
+		String topicName = "testAbortTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
+		kafkaProducer.abortTransaction();
+		kafkaProducer.resumeTransaction(0L, (short) 1);
+	}
+
+	@Test(timeout = 30000L, expected = IllegalStateException.class)
+	public void testFlushAfterClosed() {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
+		kafkaProducer.flush();
+	}
+
+	private FlinkKafkaInternalProducer<String, String> getClosedProducer(String topicName) {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		return kafkaProducer;
+	}
+
 	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
 		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
 			kafkaConsumer.subscribe(Collections.singletonList(topicName));