You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:47:21 UTC
[flink] 02/04: [FLINK-10774] [tests] Refactor Kafka tests to have
consistent life cycle verifications
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 232560d0c95f9bccfb199a799ea39a89b44d671b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 29 11:45:27 2019 +0800
[FLINK-10774] [tests] Refactor Kafka tests to have consistent life cycle verifications
---
.../kafka/FlinkKafkaConsumerBaseTest.java | 64 ++++++++++------------
1 file changed, 28 insertions(+), 36 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b190d34..54b2c8a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -64,6 +64,7 @@ import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayDeque;
@@ -468,13 +469,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer);
- try {
- setupConsumer(consumer);
- fail("Exception should be thrown in open method");
- } catch (RuntimeException e) {
- assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(failureCause)).isPresent(), is(true));
- }
- consumer.close();
+ testConsumerLifeCycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
}
@@ -490,15 +485,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
testPartitionDiscoverer,
100L);
- setupConsumer(consumer);
-
- try {
- consumer.run(new TestSourceContext<>());
- fail("Exception should be thrown in run method");
- } catch (Exception e) {
- assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true));
- }
- consumer.close();
+ testConsumerLifeCycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}
@@ -512,16 +499,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L);
- setupConsumer(consumer);
-
- try {
- consumer.run(new TestSourceContext<>());
- fail("Exception should be thrown in run method");
- } catch (Exception e) {
- assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true));
- }
- consumer.close();
- consumer.joinDiscoveryLoopThread();
+ testConsumerLifeCycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}
@@ -531,19 +509,33 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
final TestingFlinkKafkaConsumer<String> consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L);
- setupConsumer(consumer);
-
- CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> consumer.run(new TestSourceContext<>())));
-
- consumer.close();
-
- consumer.joinDiscoveryLoopThread();
- runFuture.get();
-
+ testConsumerLifeCycle(consumer, null);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}
- protected void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
+ private void testConsumerLifeCycle(
+ FlinkKafkaConsumerBase<String> testKafkaConsumer,
+ @Nullable Exception expectedException) throws Exception {
+
+ if (expectedException == null) {
+ setupConsumer(testKafkaConsumer);
+ final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
+ testKafkaConsumer.close();
+ runFuture.get();
+ } else {
+ try {
+ setupConsumer(testKafkaConsumer);
+ testKafkaConsumer.run(new TestSourceContext<>());
+
+ fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
+ } catch (Exception e) {
+ assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
+ }
+ testKafkaConsumer.close();
+ }
+ }
+
+ private void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
setupConsumer(
consumer,
false,