You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/07/25 03:37:03 UTC
[flink] 01/01: FLINK-13266: Fix race condition between transaction
commit and producer closure.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch FLINK-13266
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 61f6ad4234dc362701f1e33495132b07d8f165bf
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jul 24 17:26:27 2019 +0800
FLINK-13266: Fix race condition between transaction commit and producer closure.
---
.../kafka/internal/FlinkKafkaProducer.java | 109 ++++++++++++++-----
.../connectors/kafka/FlinkKafkaProducerITCase.java | 66 ++++++++++++
.../connectors/kafka/KafkaConsumerTestBase.java | 8 +-
.../kafka/internal/FlinkKafkaInternalProducer.java | 115 ++++++++++++++++-----
.../kafka/FlinkKafkaInternalProducerITCase.java | 65 ++++++++++++
5 files changed, 304 insertions(+), 59 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index ab4cf52..9f00606 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -107,6 +107,11 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
private final KafkaProducer<K, V> kafkaProducer;
+ // This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+ // Kafka 2.3.0, we need this workaround in 0.11 producer to avoid deadlock between a transaction
+ // committing / aborting thread and a producer closing thread.
+ private final Object producerClosingLock;
+ private volatile boolean closed;
@Nullable
private final String transactionalId;
@@ -114,33 +119,50 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
public FlinkKafkaProducer(Properties properties) {
transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
kafkaProducer = new KafkaProducer<>(properties);
+ producerClosingLock = new Object();
+ closed = false;
}
// -------------------------------- Simple proxy method calls --------------------------------
@Override
public void initTransactions() {
- kafkaProducer.initTransactions();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.initTransactions();
+ }
}
@Override
public void beginTransaction() throws ProducerFencedException {
- kafkaProducer.beginTransaction();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.beginTransaction();
+ }
}
@Override
public void commitTransaction() throws ProducerFencedException {
- kafkaProducer.commitTransaction();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.commitTransaction();
+ }
}
@Override
public void abortTransaction() throws ProducerFencedException {
- kafkaProducer.abortTransaction();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.abortTransaction();
+ }
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
- kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+ }
}
@Override
@@ -155,7 +177,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- return kafkaProducer.partitionsFor(topic);
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ return kafkaProducer.partitionsFor(topic);
+ }
}
@Override
@@ -165,12 +190,18 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
@Override
public void close() {
- kafkaProducer.close();
+ closed = true;
+ synchronized (producerClosingLock) {
+ kafkaProducer.close();
+ }
}
@Override
public void close(long timeout, TimeUnit unit) {
- kafkaProducer.close(timeout, unit);
+ closed = true;
+ synchronized (producerClosingLock) {
+ kafkaProducer.close(timeout, unit);
+ }
}
// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -179,7 +210,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
public void flush() {
kafkaProducer.flush();
if (transactionalId != null) {
- flushNewPartitions();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ flushNewPartitions();
+ }
}
}
@@ -189,24 +223,39 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
* {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
*/
public void resumeTransaction(long producerId, short epoch) {
- Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
- LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
- Object transactionManager = getValue(kafkaProducer, "transactionManager");
- synchronized (transactionManager) {
- Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
-
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
- invoke(sequenceNumbers, "clear");
-
- Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
- setValue(producerIdAndEpoch, "producerId", producerId);
- setValue(producerIdAndEpoch, "epoch", epoch);
-
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
- setValue(transactionManager, "transactionStarted", true);
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ Preconditions.checkState(producerId >= 0 && epoch >= 0,
+ "Incorrect values for producerId %s and epoch %s",
+ producerId,
+ epoch);
+ LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+ transactionalId,
+ producerId,
+ epoch);
+
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ synchronized (transactionManager) {
+ Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+ invoke(sequenceNumbers, "clear");
+
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ setValue(producerIdAndEpoch, "producerId", producerId);
+ setValue(producerIdAndEpoch, "epoch", epoch);
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+ setValue(transactionManager, "transactionStarted", true);
+ }
}
}
@@ -234,6 +283,12 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
return node.id();
}
+ private void ensureNotClosed() {
+ if (closed) {
+ throw new IllegalStateException("The producer has already been closed");
+ }
+ }
+
/**
* Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
* partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index f39d93d..7febd3d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -101,6 +101,62 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
deleteTestTopic(topicName);
}
+ @Test(timeout = 30000L)
+ public void testPartitionsForAfterClosed() throws Exception {
+ FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+ kafkaProducer.close();
+ assertThrows(() -> kafkaProducer.partitionsFor("Topic"), IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testInitTransactionsAfterClosed() throws Exception {
+ FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::initTransactions, IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testBeginTransactionAfterClosed() throws Exception {
+ FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::beginTransaction, IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testCommitTransactionAfterClosed() throws Exception {
+ String topicName = "testCommitTransactionAfterClosed";
+ FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::commitTransaction, IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testAbortOrResumeTransactionAfterClosed() throws Exception {
+ String topicName = "testCommitTransactionAfterClosed";
+ FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::abortTransaction, IllegalStateException.class);
+ assertThrows(() -> kafkaProducer.resumeTransaction(0L, (short) 1), IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testFlushAfterClosed() throws Exception {
+ String topicName = "testCommitTransactionAfterClosed";
+ FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::flush, IllegalStateException.class);
+ }
+
private void assertRecord(String topicName, String expectedKey, String expectedValue) {
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
kafkaConsumer.subscribe(Collections.singletonList(topicName));
@@ -111,4 +167,14 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
assertEquals(expectedValue, record.value());
}
}
+
+ private void assertThrows(Runnable action, Class<IllegalStateException> expectedException) throws Exception {
+ try {
+ action.run();
+ } catch (Exception e) {
+ if (!expectedException.isAssignableFrom(e.getClass())) {
+ throw e;
+ }
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1582922..56efb1e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -2207,12 +2207,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
}
- private abstract static class TestDeserializer implements
+ private abstract static class AbstractTestDeserializer implements
KafkaDeserializationSchema<Tuple3<Integer, Integer, String>> {
protected final TypeSerializer<Tuple2<Integer, Integer>> ts;
- public TestDeserializer(ExecutionConfig ec) {
+ public AbstractTestDeserializer(ExecutionConfig ec) {
ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec);
}
@@ -2234,7 +2234,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
}
- private static class Tuple2WithTopicSchema extends TestDeserializer
+ private static class Tuple2WithTopicSchema extends AbstractTestDeserializer
implements KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
public Tuple2WithTopicSchema(ExecutionConfig ec) {
@@ -2264,7 +2264,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
}
- private static class TestDeSerializer extends TestDeserializer
+ private static class TestDeSerializer extends AbstractTestDeserializer
implements KafkaSerializationSchema<Tuple3<Integer, Integer, String>> {
public TestDeSerializer(ExecutionConfig ec) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 916bfc7..82553ad 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -61,39 +61,63 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
protected final KafkaProducer<K, V> kafkaProducer;
+ // This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
+ // Kafka 2.3.0, we need this workaround before Kafka dependency is bumped to 2.3.0 to avoid deadlock
+ // between a transaction committing / aborting thread and a producer closing thread.
+ // TODO: remove the workaround after Kafka dependency is bumped to 2.3.0+
+ private final Object producerClosingLock;
+ private volatile boolean closed;
+
@Nullable
protected final String transactionalId;
public FlinkKafkaInternalProducer(Properties properties) {
transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
kafkaProducer = new KafkaProducer<>(properties);
+ producerClosingLock = new Object();
+ closed = false;
}
// -------------------------------- Simple proxy method calls --------------------------------
@Override
public void initTransactions() {
- kafkaProducer.initTransactions();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.initTransactions();
+ }
}
@Override
public void beginTransaction() throws ProducerFencedException {
- kafkaProducer.beginTransaction();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.beginTransaction();
+ }
}
@Override
public void commitTransaction() throws ProducerFencedException {
- kafkaProducer.commitTransaction();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.commitTransaction();
+ }
}
@Override
public void abortTransaction() throws ProducerFencedException {
- kafkaProducer.abortTransaction();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.abortTransaction();
+ }
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
- kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+ }
}
@Override
@@ -108,7 +132,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- return kafkaProducer.partitionsFor(topic);
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ return kafkaProducer.partitionsFor(topic);
+ }
}
@Override
@@ -118,17 +145,26 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
@Override
public void close() {
- kafkaProducer.close();
+ closed = true;
+ synchronized (producerClosingLock) {
+ kafkaProducer.close();
+ }
}
@Override
public void close(long timeout, TimeUnit unit) {
- kafkaProducer.close(timeout, unit);
+ closed = true;
+ synchronized (producerClosingLock) {
+ kafkaProducer.close(timeout, unit);
+ }
}
@Override
public void close(Duration duration) {
- kafkaProducer.close(duration);
+ closed = true;
+ synchronized (producerClosingLock) {
+ kafkaProducer.close(duration);
+ }
}
// -------------------------------- New methods or methods with changed behaviour --------------------------------
@@ -137,7 +173,10 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
public void flush() {
kafkaProducer.flush();
if (transactionalId != null) {
- flushNewPartitions();
+ synchronized (producerClosingLock) {
+ ensureNotClosed();
+ flushNewPartitions();
+ }
}
}
@@ -148,24 +187,38 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
* https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
*/
public void resumeTransaction(long producerId, short epoch) {
- Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch);
- LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
-
- Object transactionManager = getValue(kafkaProducer, "transactionManager");
- synchronized (transactionManager) {
- Object nextSequence = getValue(transactionManager, "nextSequence");
-
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
- invoke(nextSequence, "clear");
-
- Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
- setValue(producerIdAndEpoch, "producerId", producerId);
- setValue(producerIdAndEpoch, "epoch", epoch);
-
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
-
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
- setValue(transactionManager, "transactionStarted", true);
+ synchronized (producerClosingLock) {
+ Preconditions.checkState(producerId >= 0 && epoch >= 0,
+ "Incorrect values for producerId %s and epoch %s",
+ producerId,
+ epoch);
+ LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}",
+ transactionalId,
+ producerId,
+ epoch);
+
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ synchronized (transactionManager) {
+ Object nextSequence = getValue(transactionManager, "nextSequence");
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+ invoke(nextSequence, "clear");
+
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ setValue(producerIdAndEpoch, "producerId", producerId);
+ setValue(producerIdAndEpoch, "epoch", epoch);
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+ setValue(transactionManager, "transactionStarted", true);
+ }
}
}
@@ -192,6 +245,12 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
return node.id();
}
+ private void ensureNotClosed() {
+ if (closed) {
+ throw new IllegalStateException("The producer has already been closed");
+ }
+ }
+
/**
* Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
* partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index d35af10..2ea8cd4 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -101,6 +101,71 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
deleteTestTopic(topicName);
}
+ @Test(timeout = 30000L)
+ public void testPartitionsForAfterClosed() throws Exception {
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ kafkaProducer.close();
+ assertThrows(() -> kafkaProducer.partitionsFor("Topic"), IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testInitTransactionsAfterClosed() throws Exception {
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::initTransactions, IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testBeginTransactionAfterClosed() throws Exception {
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::beginTransaction, IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testCommitTransactionAfterClosed() throws Exception {
+ String topicName = "testCommitTransactionAfterClosed";
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::commitTransaction, IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testAbortTransactionAfterClosed() throws Exception {
+ String topicName = "testCommitTransactionAfterClosed";
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.close();
+ assertThrows(() -> kafkaProducer.resumeTransaction(0L, (short) 1), IllegalStateException.class);
+ }
+
+ @Test(timeout = 30000L)
+ public void testFlushAfterClosed() throws Exception {
+ String topicName = "testCommitTransactionAfterClosed";
+ FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.close();
+ assertThrows(kafkaProducer::flush, IllegalStateException.class);
+ }
+
+ private void assertThrows(Runnable action, Class<IllegalStateException> expectedException) throws Exception {
+ try {
+ action.run();
+ } catch (Exception e) {
+ if (!expectedException.isAssignableFrom(e.getClass())) {
+ throw e;
+ }
+ }
+ }
+
private void assertRecord(String topicName, String expectedKey, String expectedValue) {
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
kafkaConsumer.subscribe(Collections.singletonList(topicName));