You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/09 16:59:13 UTC
[7/9] flink git commit: [FLINK-6988][kafka] Add test for failure
before before checkpoint and scaling down
[FLINK-6988][kafka] Add test for failure before before checkpoint and scaling down
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ada50b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ada50b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ada50b3
Branch: refs/heads/master
Commit: 4ada50b3dd7c4af9735585a8c45eda4de10bb6e5
Parents: 867c012
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 24 14:16:55 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducer011Tests.java | 114 ++++++++++++++++---
.../util/OneInputStreamOperatorTestHarness.java | 11 ++
2 files changed, 108 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 51410da..dd21bf4 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.junit.Before;
import org.junit.Test;
@@ -258,26 +259,61 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
deleteTestTopic(topic);
}
- private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
- Properties properties = createProperties();
+ /**
+ * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
+ * which happened before first checkpoint and was followed up by reducing the parallelism.
+ * If such transactions were left alone lingering it consumers would be unable to read committed records
+ * that were created after this lingering transaction.
+ */
+ @Test(timeout = 120_000L)
+ public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+ String topic = "scale-down-before-first-checkpoint";
+
+ List<AutoCloseable> operatorsToClose = new ArrayList<>();
+ int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
+ for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
+ OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
+ topic,
+ preScaleDownParallelism,
+ preScaleDownParallelism,
+ subtaskIndex);
+
+ preScaleDownOperator.setup();
+ preScaleDownOperator.open();
+ preScaleDownOperator.processElement(subtaskIndex * 2, 0);
+ preScaleDownOperator.snapshot(0, 1);
+ preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
+
+ operatorsToClose.add(preScaleDownOperator);
+ }
- FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
- topic,
- integerKeyedSerializationSchema,
- properties,
- FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+ // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
+ // there might not be any close)
- return new OneInputStreamOperatorTestHarness<>(
- new StreamSink<>(kafkaProducer),
- IntSerializer.INSTANCE);
- }
+ // After previous failure simulate restarting application with smaller parallelism
+ OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
- private Properties createProperties() {
- Properties properties = new Properties();
- properties.putAll(standardProps);
- properties.putAll(secureProps);
- properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
- return properties;
+ postScaleDownOperator1.setup();
+ postScaleDownOperator1.open();
+
+ // write and commit more records, after potentially lingering transactions
+ postScaleDownOperator1.processElement(46, 7);
+ postScaleDownOperator1.snapshot(4, 8);
+ postScaleDownOperator1.processElement(47, 9);
+ postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+ //now we should have:
+ // - records 42, 43, 44 and 45 in aborted transactions
+ // - committed transaction with record 46
+ // - pending transaction with record 47
+ assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
+
+ postScaleDownOperator1.close();
+ // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
+ for (AutoCloseable operatorToClose : operatorsToClose) {
+ closeIgnoringProducerFenced(operatorToClose);
+ }
+ deleteTestTopic(topic);
}
@Test
@@ -363,4 +399,48 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
assertEquals(expectedValue, record.value());
}
}
+
+ private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
+ try {
+ autoCloseable.close();
+ }
+ catch (Exception ex) {
+ if (!(ex.getCause() instanceof ProducerFencedException)) {
+ throw ex;
+ }
+ }
+ }
+
+ private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
+ return createTestHarness(topic, 1, 1, 0);
+ }
+
+ private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
+ String topic,
+ int maxParallelism,
+ int parallelism,
+ int subtaskIndex) throws Exception {
+ Properties properties = createProperties();
+
+ FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+ topic,
+ integerKeyedSerializationSchema,
+ properties,
+ FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+ return new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ maxParallelism,
+ parallelism,
+ subtaskIndex,
+ IntSerializer.INSTANCE);
+ }
+
+ private Properties createProperties() {
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+ return properties;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c8fa2a4..5c7d986 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -51,6 +51,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
+ int maxParallelism,
+ int parallelism,
+ int subtaskIndex,
+ TypeSerializer<IN> typeSerializerIn) throws Exception {
+ this(operator, maxParallelism, parallelism, subtaskIndex);
+
+ config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+ }
+
+ public OneInputStreamOperatorTestHarness(
+ OneInputStreamOperator<IN, OUT> operator,
TypeSerializer<IN> typeSerializerIn,
Environment environment) throws Exception {
this(operator, environment);