You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/20 12:37:24 UTC

[2/2] flink git commit: [FLINK-8086][kafka] Ignore ProducerFencedException during recovery

[FLINK-8086][kafka] Ignore ProducerFencedException during recovery

ProducerFencedException can happen if we restore twice from the same checkpoint
or if we restore from an old savepoint. In both cases transactional.ids that we
want to recoverAndCommit have been already committed and reused. Reusing mean
that they will be known by Kafka's brokers under newer producerId/epochId,
which will result in ProducerFencedException if we try to commit again some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of failing
we might have a semi silent (with a warning) data loss.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f3f8c77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f3f8c77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f3f8c77

Branch: refs/heads/release-1.4
Commit: 2f3f8c773d50cca25a0c304c8348ac4db362b136
Parents: 1a6121a
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Nov 17 14:40:30 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 20 13:26:15 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 20 ++++-----
 .../kafka/FlinkKafkaProducer011ITCase.java      | 47 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f3f8c77/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 611a3d5..6b0136d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -678,14 +679,12 @@ public class FlinkKafkaProducer011<IN>
 	protected void recoverAndCommit(KafkaTransactionState transaction) {
 		switch (semantic) {
 			case EXACTLY_ONCE:
-				FlinkKafkaProducer<byte[], byte[]> producer =
-					initTransactionalProducer(transaction.transactionalId, false);
-				producer.resumeTransaction(transaction.producerId, transaction.epoch);
-				try {
+				try (FlinkKafkaProducer<byte[], byte[]> producer =
+						initTransactionalProducer(transaction.transactionalId, false)) {
+					producer.resumeTransaction(transaction.producerId, transaction.epoch);
 					producer.commitTransaction();
-					producer.close();
 				}
-				catch (InvalidTxnStateException ex) {
+				catch (InvalidTxnStateException | ProducerFencedException ex) {
 					// That means we have committed this transaction before.
 					LOG.warn("Encountered error {} while recovering transaction {}. " +
 						"Presumably this transaction has been already committed before",
@@ -720,11 +719,10 @@ public class FlinkKafkaProducer011<IN>
 	protected void recoverAndAbort(KafkaTransactionState transaction) {
 		switch (semantic) {
 			case EXACTLY_ONCE:
-				FlinkKafkaProducer<byte[], byte[]> producer =
-					initTransactionalProducer(transaction.transactionalId, false);
-				producer.resumeTransaction(transaction.producerId, transaction.epoch);
-				producer.abortTransaction();
-				producer.close();
+				try (FlinkKafkaProducer<byte[], byte[]> producer =
+						initTransactionalProducer(transaction.transactionalId, false)) {
+					producer.initTransactions();
+				}
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3f8c77/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 922344d..07acd4f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -219,6 +219,53 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
+	@Test(timeout = 120_000L)
+	public void testFailAndRecoverSameCheckpointTwice() throws Exception {
+		String topic = "flink-kafka-producer-fail-and-recover-same-checkpoint-twice";
+
+		OperatorStateHandles snapshot1;
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			testHarness.open();
+			testHarness.processElement(42, 0);
+			testHarness.snapshot(0, 1);
+			testHarness.processElement(43, 2);
+			snapshot1 = testHarness.snapshot(1, 3);
+
+			testHarness.processElement(44, 4);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			// restore from snapshot1, transactions with records 44 and 45 should be aborted
+			testHarness.initializeState(snapshot1);
+			testHarness.open();
+
+			// write and commit more records, after potentially lingering transactions
+			testHarness.processElement(44, 7);
+			testHarness.snapshot(2, 8);
+			testHarness.processElement(45, 9);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			// restore from snapshot1, transactions with records 44 and 45 should be aborted
+			testHarness.initializeState(snapshot1);
+			testHarness.open();
+
+			// write and commit more records, after potentially lingering transactions
+			testHarness.processElement(44, 7);
+			testHarness.snapshot(3, 8);
+			testHarness.processElement(45, 9);
+		}
+
+		//now we should have:
+		// - records 42 and 43 in committed transactions
+		// - aborted transactions with records 44 and 45
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+		deleteTestTopic(topic);
+	}
+
 	/**
 	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
 	 * which happened before first checkpoint and was followed up by reducing the parallelism.