You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:47:23 UTC
[flink] 04/04: [FLINK-10774][tests] Refactor
FlinkKafkaConsumerBaseTest#testConsumerLifeCycle
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3cbaabc527dee12a1a28ba5542cea60a939baf75
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jan 30 17:43:20 2019 +0100
[FLINK-10774][tests] Refactor FlinkKafkaConsumerBaseTest#testConsumerLifeCycle
Split #testConsumerLifeCycle into two methods which represent the two if-else
branches.
This closes #7606.
---
.../kafka/FlinkKafkaConsumerBaseTest.java | 46 ++++++++++------------
1 file changed, 21 insertions(+), 25 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index af4fd98..e59e2a6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -64,7 +64,6 @@ import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayDeque;
@@ -469,7 +468,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer);
- testConsumerLifeCycle(consumer, failureCause);
+ testFailingConsumerLifecycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
}
@@ -485,7 +484,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
testPartitionDiscoverer,
100L);
- testConsumerLifeCycle(consumer, failureCause);
+ testFailingConsumerLifecycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}
@@ -503,40 +502,37 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L);
- testConsumerLifeCycle(consumer, failureCause);
+ testFailingConsumerLifecycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}
+ private void testFailingConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer, @Nonnull Exception expectedException) throws Exception {
+ try {
+ setupConsumer(testKafkaConsumer);
+ testKafkaConsumer.run(new TestSourceContext<>());
+
+ fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
+ } catch (Exception e) {
+ assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
+ }
+ testKafkaConsumer.close();
+ }
+
@Test
public void testClosePartitionDiscovererWithCancellation() throws Exception {
final DummyPartitionDiscoverer testPartitionDiscoverer = new DummyPartitionDiscoverer();
final TestingFlinkKafkaConsumer<String> consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L);
- testConsumerLifeCycle(consumer, null);
+ testNormalConsumerLifecycle(consumer);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}
- private void testConsumerLifeCycle(
- FlinkKafkaConsumerBase<String> testKafkaConsumer,
- @Nullable Exception expectedException) throws Exception {
-
- if (expectedException == null) {
- setupConsumer(testKafkaConsumer);
- final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
- testKafkaConsumer.close();
- runFuture.get();
- } else {
- try {
- setupConsumer(testKafkaConsumer);
- testKafkaConsumer.run(new TestSourceContext<>());
-
- fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
- } catch (Exception e) {
- assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
- }
- testKafkaConsumer.close();
- }
+ private void testNormalConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer) throws Exception {
+ setupConsumer(testKafkaConsumer);
+ final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
+ testKafkaConsumer.close();
+ runFuture.get();
}
private void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {