You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/07/25 03:37:02 UTC

[flink] branch FLINK-13266 created (now 61f6ad4)

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a change to branch FLINK-13266
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at 61f6ad4  FLINK-13266: Fix race condition between transaction commit and producer closure.

This branch includes the following new commits:

     new 61f6ad4  FLINK-13266: Fix race condition between transaction commit and producer closure.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 01/01: FLINK-13266: Fix race condition between transaction commit and producer closure.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch FLINK-13266
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61f6ad4234dc362701f1e33495132b07d8f165bf
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jul 24 17:26:27 2019 +0800

    FLINK-13266: Fix race condition between transaction commit and producer closure.
---
 .../kafka/internal/FlinkKafkaProducer.java         | 109 ++++++++++++++-----
 .../connectors/kafka/FlinkKafkaProducerITCase.java |  66 ++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java    |   8 +-
 .../kafka/internal/FlinkKafkaInternalProducer.java | 115 ++++++++++++++++-----
 .../kafka/FlinkKafkaInternalProducerITCase.java    |  65 ++++++++++++
 5 files changed, 304 insertions(+), 59 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index ab4cf52..9f00606 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -107,6 +107,11 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
 
 	private final KafkaProducer<K, V> kafkaProducer;
+	// This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+	// Kafka 2.3.0, we need this workaround in 0.11 producer to avoid deadlock between a transaction
+	// committing / aborting thread and a producer closing thread.
+	private final Object producerClosingLock;
+	private volatile boolean closed;
 
 	@Nullable
 	private final String transactionalId;
@@ -114,33 +119,50 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	public FlinkKafkaProducer(Properties properties) {
 		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		kafkaProducer = new KafkaProducer<>(properties);
+		producerClosingLock = new Object();
+		closed = false;
 	}
 
 	// -------------------------------- Simple proxy method calls --------------------------------
 
 	@Override
 	public void initTransactions() {
-		kafkaProducer.initTransactions();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.initTransactions();
+		}
 	}
 
 	@Override
 	public void beginTransaction() throws ProducerFencedException {
-		kafkaProducer.beginTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.beginTransaction();
+		}
 	}
 
 	@Override
 	public void commitTransaction() throws ProducerFencedException {
-		kafkaProducer.commitTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.commitTransaction();
+		}
 	}
 
 	@Override
 	public void abortTransaction() throws ProducerFencedException {
-		kafkaProducer.abortTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.abortTransaction();
+		}
 	}
 
 	@Override
 	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
-		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		}
 	}
 
 	@Override
@@ -155,7 +177,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public List<PartitionInfo> partitionsFor(String topic) {
-		return kafkaProducer.partitionsFor(topic);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			return kafkaProducer.partitionsFor(topic);
+		}
 	}
 
 	@Override
@@ -165,12 +190,18 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public void close() {
-		kafkaProducer.close();
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close();
+		}
 	}
 
 	@Override
 	public void close(long timeout, TimeUnit unit) {
-		kafkaProducer.close(timeout, unit);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(timeout, unit);
+		}
 	}
 
 	// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -179,7 +210,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	public void flush() {
 		kafkaProducer.flush();
 		if (transactionalId != null) {
-			flushNewPartitions();
+			synchronized (producerClosingLock) {
+				ensureNotClosed();
+				flushNewPartitions();
+			}
 		}
 	}
 
@@ -189,24 +223,39 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	 * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
 	 */
 	public void resumeTransaction(long producerId, short epoch) {
-		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
-		LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
-		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		synchronized (transactionManager) {
-			Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-			invoke(sequenceNumbers, "clear");
-
-			Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
-			setValue(producerIdAndEpoch, "producerId", producerId);
-			setValue(producerIdAndEpoch, "epoch", epoch);
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-			setValue(transactionManager, "transactionStarted", true);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			Preconditions.checkState(producerId >= 0 && epoch >= 0,
+				"Incorrect values for producerId %s and epoch %s",
+				producerId,
+				epoch);
+			LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+				transactionalId,
+				producerId,
+				epoch);
+
+			Object transactionManager = getValue(kafkaProducer, "transactionManager");
+			synchronized (transactionManager) {
+				Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+				invoke(sequenceNumbers, "clear");
+
+				Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+				setValue(producerIdAndEpoch, "producerId", producerId);
+				setValue(producerIdAndEpoch, "epoch", epoch);
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+				setValue(transactionManager, "transactionStarted", true);
+			}
 		}
 	}
 
@@ -234,6 +283,12 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 		return node.id();
 	}
 
+	private void ensureNotClosed() {
+		if (closed) {
+			throw new IllegalStateException("The producer has already been closed");
+		}
+	}
+
 	/**
 	 * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
 	 * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index f39d93d..7febd3d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -101,6 +101,62 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 		deleteTestTopic(topicName);
 	}
 
+	@Test(timeout = 30000L)
+	public void testPartitionsForAfterClosed() throws Exception {
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(() -> kafkaProducer.partitionsFor("Topic"), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testInitTransactionsAfterClosed() throws Exception {
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::initTransactions, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testBeginTransactionAfterClosed() throws Exception {
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::beginTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testCommitTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::commitTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testAbortOrResumeTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::abortTransaction, IllegalStateException.class);
+		assertThrows(() -> kafkaProducer.resumeTransaction(0L, (short) 1), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testFlushAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::flush, IllegalStateException.class);
+	}
+
 	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
 		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
 			kafkaConsumer.subscribe(Collections.singletonList(topicName));
@@ -111,4 +167,14 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 			assertEquals(expectedValue, record.value());
 		}
 	}
+
+	private void assertThrows(Runnable action, Class<IllegalStateException> expectedException) throws Exception {
+		try {
+			action.run();
+		} catch (Exception e) {
+			if (!expectedException.isAssignableFrom(e.getClass())) {
+				throw e;
+			}
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1582922..56efb1e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -2207,12 +2207,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		}
 	}
 
-	private abstract static class TestDeserializer implements
+	private abstract static class AbstractTestDeserializer implements
 			KafkaDeserializationSchema<Tuple3<Integer, Integer, String>> {
 
 		protected final TypeSerializer<Tuple2<Integer, Integer>> ts;
 
-		public TestDeserializer(ExecutionConfig ec) {
+		public AbstractTestDeserializer(ExecutionConfig ec) {
 			ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec);
 		}
 
@@ -2234,7 +2234,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		}
 	}
 
-	private static class Tuple2WithTopicSchema extends TestDeserializer
+	private static class Tuple2WithTopicSchema extends AbstractTestDeserializer
 			implements KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
 
 		public Tuple2WithTopicSchema(ExecutionConfig ec) {
@@ -2264,7 +2264,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		}
 	}
 
-	private static class TestDeSerializer extends TestDeserializer
+	private static class TestDeSerializer extends AbstractTestDeserializer
 			implements KafkaSerializationSchema<Tuple3<Integer, Integer, String>> {
 
 		public TestDeSerializer(ExecutionConfig ec) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 916bfc7..82553ad 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -61,39 +61,63 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	protected final KafkaProducer<K, V> kafkaProducer;
 
+	// This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+	// Kafka 2.3.0, we need this workaround before Kafka dependency is bumped to 2.3.0 to avoid deadlock
+	// between a transaction committing / aborting thread and a producer closing thread.
+	// TODO: remove the workaround after Kafka dependency is bumped to 2.3.0+
+	private final Object producerClosingLock;
+	private volatile boolean closed;
+
 	@Nullable
 	protected final String transactionalId;
 
 	public FlinkKafkaInternalProducer(Properties properties) {
 		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		kafkaProducer = new KafkaProducer<>(properties);
+		producerClosingLock = new Object();
+		closed = false;
 	}
 
 	// -------------------------------- Simple proxy method calls --------------------------------
 
 	@Override
 	public void initTransactions() {
-		kafkaProducer.initTransactions();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.initTransactions();
+		}
 	}
 
 	@Override
 	public void beginTransaction() throws ProducerFencedException {
-		kafkaProducer.beginTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.beginTransaction();
+		}
 	}
 
 	@Override
 	public void commitTransaction() throws ProducerFencedException {
-		kafkaProducer.commitTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.commitTransaction();
+		}
 	}
 
 	@Override
 	public void abortTransaction() throws ProducerFencedException {
-		kafkaProducer.abortTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.abortTransaction();
+		}
 	}
 
 	@Override
 	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
-		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		}
 	}
 
 	@Override
@@ -108,7 +132,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public List<PartitionInfo> partitionsFor(String topic) {
-		return kafkaProducer.partitionsFor(topic);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			return kafkaProducer.partitionsFor(topic);
+		}
 	}
 
 	@Override
@@ -118,17 +145,26 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public void close() {
-		kafkaProducer.close();
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close();
+		}
 	}
 
 	@Override
 	public void close(long timeout, TimeUnit unit) {
-		kafkaProducer.close(timeout, unit);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(timeout, unit);
+		}
 	}
 
 	@Override
 	public void close(Duration duration) {
-		kafkaProducer.close(duration);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(duration);
+		}
 	}
 
 	// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -137,7 +173,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	public void flush() {
 		kafkaProducer.flush();
 		if (transactionalId != null) {
-			flushNewPartitions();
+			synchronized (producerClosingLock) {
+				ensureNotClosed();
+				flushNewPartitions();
+			}
 		}
 	}
 
@@ -148,24 +187,38 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	 * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
 	 */
 	public void resumeTransaction(long producerId, short epoch) {
-		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
-		LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
-		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		synchronized (transactionManager) {
-			Object nextSequence = getValue(transactionManager, "nextSequence");
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-			invoke(nextSequence, "clear");
-
-			Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
-			setValue(producerIdAndEpoch, "producerId", producerId);
-			setValue(producerIdAndEpoch, "epoch", epoch);
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-			setValue(transactionManager, "transactionStarted", true);
+		synchronized (producerClosingLock) {
+			Preconditions.checkState(producerId >= 0 && epoch >= 0,
+				"Incorrect values for producerId %s and epoch %s",
+				producerId,
+				epoch);
+			LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+				transactionalId,
+				producerId,
+				epoch);
+
+			Object transactionManager = getValue(kafkaProducer, "transactionManager");
+			synchronized (transactionManager) {
+				Object nextSequence = getValue(transactionManager, "nextSequence");
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+				invoke(nextSequence, "clear");
+
+				Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+				setValue(producerIdAndEpoch, "producerId", producerId);
+				setValue(producerIdAndEpoch, "epoch", epoch);
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+				setValue(transactionManager, "transactionStarted", true);
+			}
 		}
 	}
 
@@ -192,6 +245,12 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 		return node.id();
 	}
 
+	private void ensureNotClosed() {
+		if (closed) {
+			throw new IllegalStateException("The producer has already been closed");
+		}
+	}
+
 	/**
 	 * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
 	 * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index d35af10..2ea8cd4 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -101,6 +101,71 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 		deleteTestTopic(topicName);
 	}
 
+	@Test(timeout = 30000L)
+	public void testPartitionsForAfterClosed() throws Exception {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(() -> kafkaProducer.partitionsFor("Topic"), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testInitTransactionsAfterClosed() throws Exception {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::initTransactions, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testBeginTransactionAfterClosed() throws Exception {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::beginTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testCommitTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::commitTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testAbortTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(() -> kafkaProducer.resumeTransaction(0L, (short) 1), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testFlushAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::flush, IllegalStateException.class);
+	}
+
+	private void assertThrows(Runnable action, Class<IllegalStateException> expectedException) throws Exception {
+		try {
+			action.run();
+		} catch (Exception e) {
+			if (!expectedException.isAssignableFrom(e.getClass())) {
+				throw e;
+			}
+		}
+	}
+
 	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
 		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
 			kafkaConsumer.subscribe(Collections.singletonList(topicName));