You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/20 12:25:20 UTC

[1/2] flink git commit: [FLINK-8086][kafka] Ignore ProducerFencedException during recovery

Repository: flink
Updated Branches:
  refs/heads/master 7c63526ad -> d57ebd063


[FLINK-8086][kafka] Ignore ProducerFencedException during recovery

ProducerFencedException can happen if we restore twice from the same checkpoint
or if we restore from an old savepoint. In both cases transactional.ids that we
want to recoverAndCommit have been already committed and reused. Reusing mean
that they will be known by Kafka's brokers under newer producerId/epochId,
which will result in ProducerFencedException if we try to commit again some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of failing
we might have a semi silent (with a warning) data loss.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d57ebd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d57ebd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d57ebd06

Branch: refs/heads/master
Commit: d57ebd063bad571d0ea276da5beee18aeb568b50
Parents: 34120ef
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Nov 17 14:40:30 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 20 13:24:50 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 20 ++++-----
 .../kafka/FlinkKafkaProducer011ITCase.java      | 47 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d57ebd06/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 611a3d5..6b0136d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -678,14 +679,12 @@ public class FlinkKafkaProducer011<IN>
 	protected void recoverAndCommit(KafkaTransactionState transaction) {
 		switch (semantic) {
 			case EXACTLY_ONCE:
-				FlinkKafkaProducer<byte[], byte[]> producer =
-					initTransactionalProducer(transaction.transactionalId, false);
-				producer.resumeTransaction(transaction.producerId, transaction.epoch);
-				try {
+				try (FlinkKafkaProducer<byte[], byte[]> producer =
+						initTransactionalProducer(transaction.transactionalId, false)) {
+					producer.resumeTransaction(transaction.producerId, transaction.epoch);
 					producer.commitTransaction();
-					producer.close();
 				}
-				catch (InvalidTxnStateException ex) {
+				catch (InvalidTxnStateException | ProducerFencedException ex) {
 					// That means we have committed this transaction before.
 					LOG.warn("Encountered error {} while recovering transaction {}. " +
 						"Presumably this transaction has been already committed before",
@@ -720,11 +719,10 @@ public class FlinkKafkaProducer011<IN>
 	protected void recoverAndAbort(KafkaTransactionState transaction) {
 		switch (semantic) {
 			case EXACTLY_ONCE:
-				FlinkKafkaProducer<byte[], byte[]> producer =
-					initTransactionalProducer(transaction.transactionalId, false);
-				producer.resumeTransaction(transaction.producerId, transaction.epoch);
-				producer.abortTransaction();
-				producer.close();
+				try (FlinkKafkaProducer<byte[], byte[]> producer =
+						initTransactionalProducer(transaction.transactionalId, false)) {
+					producer.initTransactions();
+				}
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:

http://git-wip-us.apache.org/repos/asf/flink/blob/d57ebd06/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 922344d..07acd4f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -219,6 +219,53 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
+	@Test(timeout = 120_000L)
+	public void testFailAndRecoverSameCheckpointTwice() throws Exception {
+		String topic = "flink-kafka-producer-fail-and-recover-same-checkpoint-twice";
+
+		OperatorStateHandles snapshot1;
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			testHarness.open();
+			testHarness.processElement(42, 0);
+			testHarness.snapshot(0, 1);
+			testHarness.processElement(43, 2);
+			snapshot1 = testHarness.snapshot(1, 3);
+
+			testHarness.processElement(44, 4);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			// restore from snapshot1, transactions with records 44 and 45 should be aborted
+			testHarness.initializeState(snapshot1);
+			testHarness.open();
+
+			// write and commit more records, after potentially lingering transactions
+			testHarness.processElement(44, 7);
+			testHarness.snapshot(2, 8);
+			testHarness.processElement(45, 9);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			// restore from snapshot1, transactions with records 44 and 45 should be aborted
+			testHarness.initializeState(snapshot1);
+			testHarness.open();
+
+			// write and commit more records, after potentially lingering transactions
+			testHarness.processElement(44, 7);
+			testHarness.snapshot(3, 8);
+			testHarness.processElement(45, 9);
+		}
+
+		//now we should have:
+		// - records 42 and 43 in committed transactions
+		// - aborted transactions with records 44 and 45
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+		deleteTestTopic(topic);
+	}
+
 	/**
 	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
 	 * which happened before first checkpoint and was followed up by reducing the parallelism.


[2/2] flink git commit: [hotfix][kafka] Improve logging in FlinkKafkaProducer011

Posted by al...@apache.org.
[hotfix][kafka] Improve logging in FlinkKafkaProducer011


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34120efe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34120efe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34120efe

Branch: refs/heads/master
Commit: 34120efe05ba1141de50b1c39318d70c70be516e
Parents: 7c63526
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Nov 17 14:31:07 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 20 13:24:50 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaProducer011.java     | 7 ++++++-
 .../connectors/kafka/internal/FlinkKafkaProducer.java         | 2 +-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34120efe/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 08599d8..611a3d5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -1022,7 +1022,12 @@ public class FlinkKafkaProducer011<IN>
 
 		@Override
 		public String toString() {
-			return String.format("%s [transactionalId=%s]", this.getClass().getSimpleName(), transactionalId);
+			return String.format(
+				"%s [transactionalId=%s, producerId=%s, epoch=%s]",
+				this.getClass().getSimpleName(),
+				transactionalId,
+				producerId,
+				epoch);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/34120efe/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 9d50379..2f58d56 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -188,7 +188,7 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	 */
 	public void resumeTransaction(long producerId, short epoch) {
 		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
-		LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
+		LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
 
 		Object transactionManager = getValue(kafkaProducer, "transactionManager");
 		synchronized (transactionManager) {