You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/23 13:44:41 UTC

[2/3] flink git commit: [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44
5. crash....
6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f214e7d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f214e7d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f214e7d8

Branch: refs/heads/master
Commit: f214e7d8ed5a38a8b56f12cf8d7bc7dd0ba31189
Parents: 2ac32c5
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Nov 22 15:53:08 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 23 14:44:18 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 85 ++++++--------------
 .../kafka/FlinkKafkaProducer011ITCase.java      | 65 +++++++++++++++
 2 files changed, 91 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f214e7d8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0c741f5..b14e487 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -223,16 +222,11 @@ public class FlinkKafkaProducer011<IN>
 	private final int kafkaProducersPoolSize;
 
 	/**
-	 * Available transactional ids.
+	 * Pool of available transactional ids.
 	 */
 	private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
 
 	/**
-	 * Pool of KafkaProducers objects.
-	 */
-	private transient Optional<ProducersPool> producersPool = Optional.empty();
-
-	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
 	 */
 	private boolean writeTimestampToKafka = false;
@@ -599,12 +593,6 @@ public class FlinkKafkaProducer011<IN>
 		catch (Exception e) {
 			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
 		}
-		try {
-			producersPool.ifPresent(pool -> pool.close());
-		}
-		catch (Exception e) {
-			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
-		}
 		// make sure we propagate pending errors
 		checkErroneous();
 	}
@@ -615,7 +603,7 @@ public class FlinkKafkaProducer011<IN>
 	protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
 		switch (semantic) {
 			case EXACTLY_ONCE:
-				FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
+				FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
 				producer.beginTransaction();
 				return new KafkaTransactionState(producer.getTransactionalId(), producer);
 			case AT_LEAST_ONCE:
@@ -631,21 +619,6 @@ public class FlinkKafkaProducer011<IN>
 		}
 	}
 
-	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception {
-		FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
-		if (producer == null) {
-			String transactionalId = availableTransactionalIds.poll();
-			if (transactionalId == null) {
-				throw new FlinkKafka011Exception(
-					FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
-					"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
-			}
-			producer = initTransactionalProducer(transactionalId, true);
-			producer.initTransactions();
-		}
-		return producer;
-	}
-
 	@Override
 	protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
 		switch (semantic) {
@@ -666,7 +639,7 @@ public class FlinkKafkaProducer011<IN>
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				transaction.producer.commitTransaction();
-				getProducersPool().add(transaction.producer);
+				recycleTransactionalProducer(transaction.producer);
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:
@@ -706,7 +679,7 @@ public class FlinkKafkaProducer011<IN>
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				transaction.producer.abortTransaction();
-				getProducersPool().add(transaction.producer);
+				recycleTransactionalProducer(transaction.producer);
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:
@@ -796,10 +769,7 @@ public class FlinkKafkaProducer011<IN>
 
 		if (semantic != Semantic.EXACTLY_ONCE) {
 			nextTransactionalIdHint = null;
-			producersPool = Optional.empty();
 		} else {
-			producersPool = Optional.of(new ProducersPool());
-
 			ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
 			if (transactionalIdHints.size() > 1) {
 				throw new IllegalStateException(
@@ -883,16 +853,33 @@ public class FlinkKafkaProducer011<IN>
 		return currentTransaction.producer.getTransactionCoordinatorId();
 	}
 
+	/**
+	 * For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash
+	 * with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we
+	 * obtain new producerId and epoch counters).
+	 */
+	private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {
+		String transactionalId = availableTransactionalIds.poll();
+		if (transactionalId == null) {
+			throw new FlinkKafka011Exception(
+				FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
+				"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
+		}
+		FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);
+		producer.initTransactions();
+		return producer;
+	}
+
+	private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
+		availableTransactionalIds.add(producer.getTransactionalId());
+		producer.close();
+	}
+
 	private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
 		producerConfig.put("transactional.id", transactionalId);
 		return initProducer(registerMetrics);
 	}
 
-	private ProducersPool getProducersPool() {
-		checkState(producersPool.isPresent(), "Trying to access uninitialized producer pool");
-		return producersPool.get();
-	}
-
 	private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
 		FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig);
 
@@ -951,7 +938,6 @@ public class FlinkKafkaProducer011<IN>
 
 	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
-		producersPool = Optional.empty();
 	}
 
 	private static Properties getPropertiesFromBrokerList(String brokerList) {
@@ -1264,25 +1250,6 @@ public class FlinkKafkaProducer011<IN>
 		}
 	}
 
-	static class ProducersPool implements Closeable {
-		private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], byte[]>> pool = new LinkedBlockingDeque<>();
-
-		public void add(FlinkKafkaProducer<byte[], byte[]> producer) {
-			pool.add(producer);
-		}
-
-		public FlinkKafkaProducer<byte[], byte[]> poll() {
-			return pool.poll();
-		}
-
-		@Override
-		public void close() {
-			while (!pool.isEmpty()) {
-				pool.poll().close();
-			}
-		}
-	}
-
 	/**
 	 * Keep information required to deduce next safe to use transactional id.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f214e7d8/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 07acd4f..a32c7f8 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -52,6 +52,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -79,6 +80,49 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		extraProperties.put("isolation.level", "read_committed");
 	}
 
+	/**
+	 * This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash
+	 * with previous transactions using same transactional.ids.
+	 */
+	@Test(timeout = 120_000L)
+	public void testRestoreToCheckpointAfterExceedingProducersPool() throws Exception {
+		String topic = "flink-kafka-producer-fail-before-notify";
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = createTestHarness(topic)) {
+			testHarness1.setup();
+			testHarness1.open();
+			testHarness1.processElement(42, 0);
+			OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
+			testHarness1.processElement(43, 0);
+			testHarness1.notifyOfCompletedCheckpoint(0);
+			try {
+				for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) {
+					testHarness1.snapshot(i + 1, 0);
+					testHarness1.processElement(i, 0);
+				}
+				throw new IllegalStateException("This should not be reached.");
+			}
+			catch (Exception ex) {
+				assertIsCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex);
+			}
+
+			// Resume transactions before testHrness1 is being closed (in case of failures close() might not be called)
+			try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
+				testHarness2.setup();
+				// restore from snapshot1, transactions with records 43 and 44 should be aborted
+				testHarness2.initializeState(snapshot);
+				testHarness2.open();
+			}
+
+			assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
+			deleteTestTopic(topic);
+		}
+		catch (Exception ex) {
+			// testHarness1 will be fenced off after creating and closing testHarness2
+			assertIsCausedBy(ProducerFencedException.class, ex);
+		}
+	}
+
 	@Test(timeout = 120_000L)
 	public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
 		String topic = "flink-kafka-producer-fail-before-notify";
@@ -563,4 +607,25 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
 		return properties;
 	}
+
+	private void assertIsCausedBy(Class<?> clazz, Throwable ex) {
+		for (int depth = 0; depth < 50 && ex != null; depth++) {
+			if (clazz.isInstance(ex)) {
+				return;
+			}
+			ex = ex.getCause();
+		}
+		fail(String.format("Exception [%s] was not caused by [%s]", ex, clazz));
+	}
+
+	private void assertIsCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) {
+		for (int depth = 0; depth < 50 && ex != null; depth++) {
+			if (ex instanceof FlinkKafka011Exception) {
+				assertEquals(expectedErrorCode, ((FlinkKafka011Exception) ex).getErrorCode());
+				return;
+			}
+			ex = ex.getCause();
+		}
+		fail(String.format("Exception [%s] was not caused by FlinkKafka011Exception[errorCode=%s]", ex, expectedErrorCode));
+	}
 }