You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/09 16:59:13 UTC

[7/9] flink git commit: [FLINK-6988][kafka] Add test for failure before before checkpoint and scaling down

[FLINK-6988][kafka] Add test for failure before before checkpoint and scaling down


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ada50b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ada50b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ada50b3

Branch: refs/heads/master
Commit: 4ada50b3dd7c4af9735585a8c45eda4de10bb6e5
Parents: 867c012
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 24 14:16:55 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 114 ++++++++++++++++---
 .../util/OneInputStreamOperatorTestHarness.java |  11 ++
 2 files changed, 108 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 51410da..dd21bf4 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -258,26 +259,61 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
-	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
-		Properties properties = createProperties();
+	/**
+	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
+	 * which happened before first checkpoint and was followed up by reducing the parallelism.
+	 * If such transactions were left alone lingering it consumers would be unable to read committed records
+	 * that were created after this lingering transaction.
+	 */
+	@Test(timeout = 120_000L)
+	public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+		String topic = "scale-down-before-first-checkpoint";
+
+		List<AutoCloseable> operatorsToClose = new ArrayList<>();
+		int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
+		for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
+			OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
+				topic,
+				preScaleDownParallelism,
+				preScaleDownParallelism,
+				subtaskIndex);
+
+			preScaleDownOperator.setup();
+			preScaleDownOperator.open();
+			preScaleDownOperator.processElement(subtaskIndex * 2, 0);
+			preScaleDownOperator.snapshot(0, 1);
+			preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
+
+			operatorsToClose.add(preScaleDownOperator);
+		}
 
-		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
-			topic,
-			integerKeyedSerializationSchema,
-			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+		// do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
+		// there might not be any close)
 
-		return new OneInputStreamOperatorTestHarness<>(
-			new StreamSink<>(kafkaProducer),
-			IntSerializer.INSTANCE);
-	}
+		// After previous failure simulate restarting application with smaller parallelism
+		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
 
-	private Properties createProperties() {
-		Properties properties = new Properties();
-		properties.putAll(standardProps);
-		properties.putAll(secureProps);
-		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
-		return properties;
+		postScaleDownOperator1.setup();
+		postScaleDownOperator1.open();
+
+		// write and commit more records, after potentially lingering transactions
+		postScaleDownOperator1.processElement(46, 7);
+		postScaleDownOperator1.snapshot(4, 8);
+		postScaleDownOperator1.processElement(47, 9);
+		postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+		//now we should have:
+		// - records 42, 43, 44 and 45 in aborted transactions
+		// - committed transaction with record 46
+		// - pending transaction with record 47
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
+
+		postScaleDownOperator1.close();
+		// ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
+		for (AutoCloseable operatorToClose : operatorsToClose) {
+			closeIgnoringProducerFenced(operatorToClose);
+		}
+		deleteTestTopic(topic);
 	}
 
 	@Test
@@ -363,4 +399,48 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 			assertEquals(expectedValue, record.value());
 		}
 	}
+
+	private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
+		try {
+			autoCloseable.close();
+		}
+		catch (Exception ex) {
+			if (!(ex.getCause() instanceof ProducerFencedException)) {
+				throw ex;
+			}
+		}
+	}
+
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
+		return createTestHarness(topic, 1, 1, 0);
+	}
+
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
+		String topic,
+		int maxParallelism,
+		int parallelism,
+		int subtaskIndex) throws Exception {
+		Properties properties = createProperties();
+
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			topic,
+			integerKeyedSerializationSchema,
+			properties,
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
+			IntSerializer.INSTANCE);
+	}
+
+	private Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c8fa2a4..5c7d986 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -51,6 +51,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 
 	public OneInputStreamOperatorTestHarness(
 		OneInputStreamOperator<IN, OUT> operator,
+		int maxParallelism,
+		int parallelism,
+		int subtaskIndex,
+		TypeSerializer<IN> typeSerializerIn) throws Exception {
+		this(operator, maxParallelism, parallelism, subtaskIndex);
+
+		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+	}
+
+	public OneInputStreamOperatorTestHarness(
+		OneInputStreamOperator<IN, OUT> operator,
 		TypeSerializer<IN> typeSerializerIn,
 		Environment environment) throws Exception {
 		this(operator, environment);