You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/07/25 03:37:03 UTC

[flink] 01/01: FLINK-13266: Fix race condition between transaction commit and producer closure.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch FLINK-13266
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61f6ad4234dc362701f1e33495132b07d8f165bf
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jul 24 17:26:27 2019 +0800

    FLINK-13266: Fix race condition between transaction commit and producer closure.
---
 .../kafka/internal/FlinkKafkaProducer.java         | 109 ++++++++++++++-----
 .../connectors/kafka/FlinkKafkaProducerITCase.java |  66 ++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java    |   8 +-
 .../kafka/internal/FlinkKafkaInternalProducer.java | 115 ++++++++++++++++-----
 .../kafka/FlinkKafkaInternalProducerITCase.java    |  65 ++++++++++++
 5 files changed, 304 insertions(+), 59 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index ab4cf52..9f00606 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -107,6 +107,11 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
 
 	private final KafkaProducer<K, V> kafkaProducer;
+	// This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+	// Kafka 2.3.0, we need this workaround in 0.11 producer to avoid deadlock between a transaction
+	// committing / aborting thread and a producer closing thread.
+	private final Object producerClosingLock;
+	private volatile boolean closed;
 
 	@Nullable
 	private final String transactionalId;
@@ -114,33 +119,50 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	public FlinkKafkaProducer(Properties properties) {
 		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		kafkaProducer = new KafkaProducer<>(properties);
+		producerClosingLock = new Object();
+		closed = false;
 	}
 
 	// -------------------------------- Simple proxy method calls --------------------------------
 
 	@Override
 	public void initTransactions() {
-		kafkaProducer.initTransactions();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.initTransactions();
+		}
 	}
 
 	@Override
 	public void beginTransaction() throws ProducerFencedException {
-		kafkaProducer.beginTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.beginTransaction();
+		}
 	}
 
 	@Override
 	public void commitTransaction() throws ProducerFencedException {
-		kafkaProducer.commitTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.commitTransaction();
+		}
 	}
 
 	@Override
 	public void abortTransaction() throws ProducerFencedException {
-		kafkaProducer.abortTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.abortTransaction();
+		}
 	}
 
 	@Override
 	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
-		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		}
 	}
 
 	@Override
@@ -155,7 +177,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public List<PartitionInfo> partitionsFor(String topic) {
-		return kafkaProducer.partitionsFor(topic);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			return kafkaProducer.partitionsFor(topic);
+		}
 	}
 
 	@Override
@@ -165,12 +190,18 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public void close() {
-		kafkaProducer.close();
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close();
+		}
 	}
 
 	@Override
 	public void close(long timeout, TimeUnit unit) {
-		kafkaProducer.close(timeout, unit);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(timeout, unit);
+		}
 	}
 
 	// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -179,7 +210,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	public void flush() {
 		kafkaProducer.flush();
 		if (transactionalId != null) {
-			flushNewPartitions();
+			synchronized (producerClosingLock) {
+				ensureNotClosed();
+				flushNewPartitions();
+			}
 		}
 	}
 
@@ -189,24 +223,39 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	 * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
 	 */
 	public void resumeTransaction(long producerId, short epoch) {
-		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
-		LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
-		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		synchronized (transactionManager) {
-			Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-			invoke(sequenceNumbers, "clear");
-
-			Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
-			setValue(producerIdAndEpoch, "producerId", producerId);
-			setValue(producerIdAndEpoch, "epoch", epoch);
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-			setValue(transactionManager, "transactionStarted", true);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			Preconditions.checkState(producerId >= 0 && epoch >= 0,
+				"Incorrect values for producerId %s and epoch %s",
+				producerId,
+				epoch);
+			LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+				transactionalId,
+				producerId,
+				epoch);
+
+			Object transactionManager = getValue(kafkaProducer, "transactionManager");
+			synchronized (transactionManager) {
+				Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+				invoke(sequenceNumbers, "clear");
+
+				Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+				setValue(producerIdAndEpoch, "producerId", producerId);
+				setValue(producerIdAndEpoch, "epoch", epoch);
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+				setValue(transactionManager, "transactionStarted", true);
+			}
 		}
 	}
 
@@ -234,6 +283,12 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 		return node.id();
 	}
 
+	private void ensureNotClosed() {
+		if (closed) {
+			throw new IllegalStateException("The producer has already been closed");
+		}
+	}
+
 	/**
 	 * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
 	 * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index f39d93d..7febd3d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -101,6 +101,62 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 		deleteTestTopic(topicName);
 	}
 
+	@Test(timeout = 30000L)
+	public void testPartitionsForAfterClosed() throws Exception {
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(() -> kafkaProducer.partitionsFor("Topic"), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testInitTransactionsAfterClosed() throws Exception {
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::initTransactions, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testBeginTransactionAfterClosed() throws Exception {
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::beginTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testCommitTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::commitTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testAbortOrResumeTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::abortTransaction, IllegalStateException.class);
+		assertThrows(() -> kafkaProducer.resumeTransaction(0L, (short) 1), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testFlushAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::flush, IllegalStateException.class);
+	}
+
 	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
 		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
 			kafkaConsumer.subscribe(Collections.singletonList(topicName));
@@ -111,4 +167,14 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 			assertEquals(expectedValue, record.value());
 		}
 	}
+
+	private void assertThrows(Runnable action, Class<IllegalStateException> expectedException) throws Exception {
+		try {
+			action.run();
+		} catch (Exception e) {
+			if (!expectedException.isAssignableFrom(e.getClass())) {
+				throw e;
+			}
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1582922..56efb1e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -2207,12 +2207,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		}
 	}
 
-	private abstract static class TestDeserializer implements
+	private abstract static class AbstractTestDeserializer implements
 			KafkaDeserializationSchema<Tuple3<Integer, Integer, String>> {
 
 		protected final TypeSerializer<Tuple2<Integer, Integer>> ts;
 
-		public TestDeserializer(ExecutionConfig ec) {
+		public AbstractTestDeserializer(ExecutionConfig ec) {
 			ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec);
 		}
 
@@ -2234,7 +2234,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		}
 	}
 
-	private static class Tuple2WithTopicSchema extends TestDeserializer
+	private static class Tuple2WithTopicSchema extends AbstractTestDeserializer
 			implements KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
 
 		public Tuple2WithTopicSchema(ExecutionConfig ec) {
@@ -2264,7 +2264,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		}
 	}
 
-	private static class TestDeSerializer extends TestDeserializer
+	private static class TestDeSerializer extends AbstractTestDeserializer
 			implements KafkaSerializationSchema<Tuple3<Integer, Integer, String>> {
 
 		public TestDeSerializer(ExecutionConfig ec) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 916bfc7..82553ad 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -61,39 +61,63 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	protected final KafkaProducer<K, V> kafkaProducer;
 
+	// This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+	// Kafka 2.3.0, we need this workaround before Kafka dependency is bumped to 2.3.0 to avoid deadlock
+	// between a transaction committing / aborting thread and a producer closing thread.
+	// TODO: remove the workaround after Kafka dependency is bumped to 2.3.0+
+	private final Object producerClosingLock;
+	private volatile boolean closed;
+
 	@Nullable
 	protected final String transactionalId;
 
 	public FlinkKafkaInternalProducer(Properties properties) {
 		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 		kafkaProducer = new KafkaProducer<>(properties);
+		producerClosingLock = new Object();
+		closed = false;
 	}
 
 	// -------------------------------- Simple proxy method calls --------------------------------
 
 	@Override
 	public void initTransactions() {
-		kafkaProducer.initTransactions();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.initTransactions();
+		}
 	}
 
 	@Override
 	public void beginTransaction() throws ProducerFencedException {
-		kafkaProducer.beginTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.beginTransaction();
+		}
 	}
 
 	@Override
 	public void commitTransaction() throws ProducerFencedException {
-		kafkaProducer.commitTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.commitTransaction();
+		}
 	}
 
 	@Override
 	public void abortTransaction() throws ProducerFencedException {
-		kafkaProducer.abortTransaction();
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.abortTransaction();
+		}
 	}
 
 	@Override
 	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
-		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+		}
 	}
 
 	@Override
@@ -108,7 +132,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public List<PartitionInfo> partitionsFor(String topic) {
-		return kafkaProducer.partitionsFor(topic);
+		synchronized (producerClosingLock) {
+			ensureNotClosed();
+			return kafkaProducer.partitionsFor(topic);
+		}
 	}
 
 	@Override
@@ -118,17 +145,26 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 
 	@Override
 	public void close() {
-		kafkaProducer.close();
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close();
+		}
 	}
 
 	@Override
 	public void close(long timeout, TimeUnit unit) {
-		kafkaProducer.close(timeout, unit);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(timeout, unit);
+		}
 	}
 
 	@Override
 	public void close(Duration duration) {
-		kafkaProducer.close(duration);
+		closed = true;
+		synchronized (producerClosingLock) {
+			kafkaProducer.close(duration);
+		}
 	}
 
 	// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -137,7 +173,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	public void flush() {
 		kafkaProducer.flush();
 		if (transactionalId != null) {
-			flushNewPartitions();
+			synchronized (producerClosingLock) {
+				ensureNotClosed();
+				flushNewPartitions();
+			}
 		}
 	}
 
@@ -148,24 +187,38 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	 * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
 	 */
 	public void resumeTransaction(long producerId, short epoch) {
-		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
-		LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
-		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		synchronized (transactionManager) {
-			Object nextSequence = getValue(transactionManager, "nextSequence");
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-			invoke(nextSequence, "clear");
-
-			Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
-			setValue(producerIdAndEpoch, "producerId", producerId);
-			setValue(producerIdAndEpoch, "epoch", epoch);
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
-			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-			setValue(transactionManager, "transactionStarted", true);
+		synchronized (producerClosingLock) {
+			Preconditions.checkState(producerId >= 0 && epoch >= 0,
+				"Incorrect values for producerId %s and epoch %s",
+				producerId,
+				epoch);
+			LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+				transactionalId,
+				producerId,
+				epoch);
+
+			Object transactionManager = getValue(kafkaProducer, "transactionManager");
+			synchronized (transactionManager) {
+				Object nextSequence = getValue(transactionManager, "nextSequence");
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+				invoke(nextSequence, "clear");
+
+				Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+				setValue(producerIdAndEpoch, "producerId", producerId);
+				setValue(producerIdAndEpoch, "epoch", epoch);
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+				invoke(transactionManager,
+					"transitionTo",
+					getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+				setValue(transactionManager, "transactionStarted", true);
+			}
 		}
 	}
 
@@ -192,6 +245,12 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 		return node.id();
 	}
 
+	private void ensureNotClosed() {
+		if (closed) {
+			throw new IllegalStateException("The producer has already been closed");
+		}
+	}
+
 	/**
 	 * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
 	 * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index d35af10..2ea8cd4 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -101,6 +101,71 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 		deleteTestTopic(topicName);
 	}
 
+	@Test(timeout = 30000L)
+	public void testPartitionsForAfterClosed() throws Exception {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(() -> kafkaProducer.partitionsFor("Topic"), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testInitTransactionsAfterClosed() throws Exception {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::initTransactions, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testBeginTransactionAfterClosed() throws Exception {
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::beginTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testCommitTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::commitTransaction, IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testAbortTransactionAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(() -> kafkaProducer.resumeTransaction(0L, (short) 1), IllegalStateException.class);
+	}
+
+	@Test(timeout = 30000L)
+	public void testFlushAfterClosed() throws Exception {
+		String topicName = "testCommitTransactionAfterClosed";
+		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+		kafkaProducer.initTransactions();
+		kafkaProducer.beginTransaction();
+		kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+		kafkaProducer.close();
+		assertThrows(kafkaProducer::flush, IllegalStateException.class);
+	}
+
+	private void assertThrows(Runnable action, Class<IllegalStateException> expectedException) throws Exception {
+		try {
+			action.run();
+		} catch (Exception e) {
+			if (!expectedException.isAssignableFrom(e.getClass())) {
+				throw e;
+			}
+		}
+	}
+
 	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
 		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
 			kafkaConsumer.subscribe(Collections.singletonList(topicName));