You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/23 13:44:40 UTC

[1/3] flink git commit: [hotfix][kafka] Remove unused method in kafka tests

Repository: flink
Updated Branches:
  refs/heads/master ccf917de2 -> 458c909ca


[hotfix][kafka] Remove unused method in kafka tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/458c909c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/458c909c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/458c909c

Branch: refs/heads/master
Commit: 458c909caf6f3ab1a6ed90c2508eacf686d1d101
Parents: f214e7d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Nov 22 15:55:20 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 23 14:44:18 2017 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011ITCase.java          | 16 ----------------
 1 file changed, 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/458c909c/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index a32c7f8..85735c8 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -29,12 +29,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
 import kafka.server.KafkaServer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
@@ -553,17 +548,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		}
 	}
 
-	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
-		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
-			kafkaConsumer.subscribe(Collections.singletonList(topicName));
-			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
-
-			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
-			assertEquals(expectedKey, record.key());
-			assertEquals(expectedValue, record.value());
-		}
-	}
-
 	private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
 		try {
 			autoCloseable.close();


[3/3] flink git commit: [hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception

Posted by al...@apache.org.
[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac32c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac32c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac32c59

Branch: refs/heads/master
Commit: 2ac32c596bfaa833beefefd8de85c504e2d8d623
Parents: ccf917d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Nov 22 11:37:48 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 23 14:44:18 2017 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafka011ErrorCode.java           | 26 ++++++++++++
 .../kafka/FlinkKafka011Exception.java           | 42 ++++++++++++++++++++
 .../connectors/kafka/FlinkKafkaProducer011.java | 22 +++++-----
 3 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
new file mode 100644
index 0000000..4f5de4f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Error codes used in {@link FlinkKafka011Exception}.
+ */
+public enum FlinkKafka011ErrorCode {
+	PRODUCERS_POOL_EMPTY,
+	EXTERNAL_ERROR
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
new file mode 100644
index 0000000..6b16e53
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends FlinkException {
+
+	private final FlinkKafka011ErrorCode errorCode;
+
+	public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message) {
+		super(message);
+		this.errorCode = errorCode;
+	}
+
+	public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause) {
+		super(message, cause);
+		this.errorCode = errorCode;
+	}
+
+	public FlinkKafka011ErrorCode getErrorCode() {
+		return errorCode;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 6b0136d..0c741f5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -551,7 +551,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	@Override
-	public void invoke(KafkaTransactionState transaction, IN next, Context context) throws Exception {
+	public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
 		checkErroneous();
 
 		byte[] serializedKey = schema.serializeKey(next);
@@ -587,7 +587,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	@Override
-	public void close() throws Exception {
+	public void close() throws FlinkKafka011Exception {
 		final KafkaTransactionState currentTransaction = currentTransaction();
 		if (currentTransaction != null) {
 			// to avoid exceptions on aborting transactions with some pending records
@@ -612,7 +612,7 @@ public class FlinkKafkaProducer011<IN>
 	// ------------------- Logic for handling checkpoint flushing -------------------------- //
 
 	@Override
-	protected KafkaTransactionState beginTransaction() throws Exception {
+	protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
@@ -631,12 +631,13 @@ public class FlinkKafkaProducer011<IN>
 		}
 	}
 
-	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception {
+	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception {
 		FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
 		if (producer == null) {
 			String transactionalId = availableTransactionalIds.poll();
 			if (transactionalId == null) {
-				throw new Exception(
+				throw new FlinkKafka011Exception(
+					FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
 					"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
 			}
 			producer = initTransactionalProducer(transactionalId, true);
@@ -646,7 +647,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	@Override
-	protected void preCommit(KafkaTransactionState transaction) throws Exception {
+	protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
 		switch (semantic) {
 			case EXACTLY_ONCE:
 			case AT_LEAST_ONCE:
@@ -740,7 +741,7 @@ public class FlinkKafkaProducer011<IN>
 	 * Flush pending records.
 	 * @param transaction
 	 */
-	private void flush(KafkaTransactionState transaction) throws Exception {
+	private void flush(KafkaTransactionState transaction) throws FlinkKafka011Exception {
 		if (transaction.producer != null) {
 			transaction.producer.flush();
 		}
@@ -936,12 +937,15 @@ public class FlinkKafkaProducer011<IN>
 		return producer;
 	}
 
-	private void checkErroneous() throws Exception {
+	private void checkErroneous() throws FlinkKafka011Exception {
 		Exception e = asyncException;
 		if (e != null) {
 			// prevent double throwing
 			asyncException = null;
-			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+			throw new FlinkKafka011Exception(
+				FlinkKafka011ErrorCode.EXTERNAL_ERROR,
+				"Failed to send data to Kafka: " + e.getMessage(),
+				e);
 		}
 	}
 


[2/3] flink git commit: [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

Posted by al...@apache.org.
[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44
5. crash....
6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f214e7d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f214e7d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f214e7d8

Branch: refs/heads/master
Commit: f214e7d8ed5a38a8b56f12cf8d7bc7dd0ba31189
Parents: 2ac32c5
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Nov 22 15:53:08 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 23 14:44:18 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 85 ++++++--------------
 .../kafka/FlinkKafkaProducer011ITCase.java      | 65 +++++++++++++++
 2 files changed, 91 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f214e7d8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0c741f5..b14e487 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -223,16 +222,11 @@ public class FlinkKafkaProducer011<IN>
 	private final int kafkaProducersPoolSize;
 
 	/**
-	 * Available transactional ids.
+	 * Pool of available transactional ids.
 	 */
 	private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
 
 	/**
-	 * Pool of KafkaProducers objects.
-	 */
-	private transient Optional<ProducersPool> producersPool = Optional.empty();
-
-	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
 	 */
 	private boolean writeTimestampToKafka = false;
@@ -599,12 +593,6 @@ public class FlinkKafkaProducer011<IN>
 		catch (Exception e) {
 			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
 		}
-		try {
-			producersPool.ifPresent(pool -> pool.close());
-		}
-		catch (Exception e) {
-			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
-		}
 		// make sure we propagate pending errors
 		checkErroneous();
 	}
@@ -615,7 +603,7 @@ public class FlinkKafkaProducer011<IN>
 	protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
 		switch (semantic) {
 			case EXACTLY_ONCE:
-				FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
+				FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
 				producer.beginTransaction();
 				return new KafkaTransactionState(producer.getTransactionalId(), producer);
 			case AT_LEAST_ONCE:
@@ -631,21 +619,6 @@ public class FlinkKafkaProducer011<IN>
 		}
 	}
 
-	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception {
-		FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
-		if (producer == null) {
-			String transactionalId = availableTransactionalIds.poll();
-			if (transactionalId == null) {
-				throw new FlinkKafka011Exception(
-					FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
-					"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
-			}
-			producer = initTransactionalProducer(transactionalId, true);
-			producer.initTransactions();
-		}
-		return producer;
-	}
-
 	@Override
 	protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
 		switch (semantic) {
@@ -666,7 +639,7 @@ public class FlinkKafkaProducer011<IN>
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				transaction.producer.commitTransaction();
-				getProducersPool().add(transaction.producer);
+				recycleTransactionalProducer(transaction.producer);
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:
@@ -706,7 +679,7 @@ public class FlinkKafkaProducer011<IN>
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				transaction.producer.abortTransaction();
-				getProducersPool().add(transaction.producer);
+				recycleTransactionalProducer(transaction.producer);
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:
@@ -796,10 +769,7 @@ public class FlinkKafkaProducer011<IN>
 
 		if (semantic != Semantic.EXACTLY_ONCE) {
 			nextTransactionalIdHint = null;
-			producersPool = Optional.empty();
 		} else {
-			producersPool = Optional.of(new ProducersPool());
-
 			ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
 			if (transactionalIdHints.size() > 1) {
 				throw new IllegalStateException(
@@ -883,16 +853,33 @@ public class FlinkKafkaProducer011<IN>
 		return currentTransaction.producer.getTransactionCoordinatorId();
 	}
 
+	/**
+	 * For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash
+	 * with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we
+	 * obtain new producerId and epoch counters).
+	 */
+	private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {
+		String transactionalId = availableTransactionalIds.poll();
+		if (transactionalId == null) {
+			throw new FlinkKafka011Exception(
+				FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
+				"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
+		}
+		FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);
+		producer.initTransactions();
+		return producer;
+	}
+
+	private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
+		availableTransactionalIds.add(producer.getTransactionalId());
+		producer.close();
+	}
+
 	private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
 		producerConfig.put("transactional.id", transactionalId);
 		return initProducer(registerMetrics);
 	}
 
-	private ProducersPool getProducersPool() {
-		checkState(producersPool.isPresent(), "Trying to access uninitialized producer pool");
-		return producersPool.get();
-	}
-
 	private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
 		FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig);
 
@@ -951,7 +938,6 @@ public class FlinkKafkaProducer011<IN>
 
 	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
-		producersPool = Optional.empty();
 	}
 
 	private static Properties getPropertiesFromBrokerList(String brokerList) {
@@ -1264,25 +1250,6 @@ public class FlinkKafkaProducer011<IN>
 		}
 	}
 
-	static class ProducersPool implements Closeable {
-		private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], byte[]>> pool = new LinkedBlockingDeque<>();
-
-		public void add(FlinkKafkaProducer<byte[], byte[]> producer) {
-			pool.add(producer);
-		}
-
-		public FlinkKafkaProducer<byte[], byte[]> poll() {
-			return pool.poll();
-		}
-
-		@Override
-		public void close() {
-			while (!pool.isEmpty()) {
-				pool.poll().close();
-			}
-		}
-	}
-
 	/**
 	 * Keep information required to deduce next safe to use transactional id.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f214e7d8/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 07acd4f..a32c7f8 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -52,6 +52,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -79,6 +80,49 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		extraProperties.put("isolation.level", "read_committed");
 	}
 
+	/**
+	 * This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash
+	 * with previous transactions using same transactional.ids.
+	 */
+	@Test(timeout = 120_000L)
+	public void testRestoreToCheckpointAfterExceedingProducersPool() throws Exception {
+		String topic = "flink-kafka-producer-fail-before-notify";
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = createTestHarness(topic)) {
+			testHarness1.setup();
+			testHarness1.open();
+			testHarness1.processElement(42, 0);
+			OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
+			testHarness1.processElement(43, 0);
+			testHarness1.notifyOfCompletedCheckpoint(0);
+			try {
+				for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) {
+					testHarness1.snapshot(i + 1, 0);
+					testHarness1.processElement(i, 0);
+				}
+				throw new IllegalStateException("This should not be reached.");
+			}
+			catch (Exception ex) {
+				assertIsCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex);
+			}
+
+			// Resume transactions before testHrness1 is being closed (in case of failures close() might not be called)
+			try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
+				testHarness2.setup();
+				// restore from snapshot1, transactions with records 43 and 44 should be aborted
+				testHarness2.initializeState(snapshot);
+				testHarness2.open();
+			}
+
+			assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
+			deleteTestTopic(topic);
+		}
+		catch (Exception ex) {
+			// testHarness1 will be fenced off after creating and closing testHarness2
+			assertIsCausedBy(ProducerFencedException.class, ex);
+		}
+	}
+
 	@Test(timeout = 120_000L)
 	public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
 		String topic = "flink-kafka-producer-fail-before-notify";
@@ -563,4 +607,25 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
 		return properties;
 	}
+
+	private void assertIsCausedBy(Class<?> clazz, Throwable ex) {
+		for (int depth = 0; depth < 50 && ex != null; depth++) {
+			if (clazz.isInstance(ex)) {
+				return;
+			}
+			ex = ex.getCause();
+		}
+		fail(String.format("Exception [%s] was not caused by [%s]", ex, clazz));
+	}
+
+	private void assertIsCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) {
+		for (int depth = 0; depth < 50 && ex != null; depth++) {
+			if (ex instanceof FlinkKafka011Exception) {
+				assertEquals(expectedErrorCode, ((FlinkKafka011Exception) ex).getErrorCode());
+				return;
+			}
+			ex = ex.getCause();
+		}
+		fail(String.format("Exception [%s] was not caused by FlinkKafka011Exception[errorCode=%s]", ex, expectedErrorCode));
+	}
 }