You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:47:21 UTC

[flink] 02/04: [FLINK-10774] [tests] Refactor Kafka tests to have consistent life cycle verifications

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 232560d0c95f9bccfb199a799ea39a89b44d671b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 29 11:45:27 2019 +0800

    [FLINK-10774] [tests] Refactor Kafka tests to have consistent life cycle verifications
---
 .../kafka/FlinkKafkaConsumerBaseTest.java          | 64 ++++++++++------------
 1 file changed, 28 insertions(+), 36 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b190d34..54b2c8a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -64,6 +64,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayDeque;
@@ -468,13 +469,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 
 		final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer);
 
-		try {
-			setupConsumer(consumer);
-			fail("Exception should be thrown in open method");
-		} catch (RuntimeException e) {
-			assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(failureCause)).isPresent(), is(true));
-		}
-		consumer.close();
+		testConsumerLifeCycle(consumer, failureCause);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
 	}
 
@@ -490,15 +485,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 			testPartitionDiscoverer,
 			100L);
 
-		setupConsumer(consumer);
-
-		try {
-			consumer.run(new TestSourceContext<>());
-			fail("Exception should be thrown in run method");
-		} catch (Exception e) {
-			assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true));
-		}
-		consumer.close();
+		testConsumerLifeCycle(consumer, failureCause);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
 	}
 
@@ -512,16 +499,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 
 		final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L);
 
-		setupConsumer(consumer);
-
-		try {
-			consumer.run(new TestSourceContext<>());
-			fail("Exception should be thrown in run method");
-		} catch (Exception e) {
-			assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true));
-		}
-		consumer.close();
-		consumer.joinDiscoveryLoopThread();
+		testConsumerLifeCycle(consumer, failureCause);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
 	}
 
@@ -531,19 +509,33 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 
 		final TestingFlinkKafkaConsumer<String> consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L);
 
-		setupConsumer(consumer);
-
-		CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> consumer.run(new TestSourceContext<>())));
-
-		consumer.close();
-
-		consumer.joinDiscoveryLoopThread();
-		runFuture.get();
-
+		testConsumerLifeCycle(consumer, null);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
 	}
 
-	protected void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
+	private void testConsumerLifeCycle(
+			FlinkKafkaConsumerBase<String> testKafkaConsumer,
+			@Nullable Exception expectedException) throws Exception {
+
+		if (expectedException == null) {
+			setupConsumer(testKafkaConsumer);
+			final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
+			testKafkaConsumer.close();
+			runFuture.get();
+		} else {
+			try {
+				setupConsumer(testKafkaConsumer);
+				testKafkaConsumer.run(new TestSourceContext<>());
+
+				fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
+			} catch (Exception e) {
+				assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
+			}
+			testKafkaConsumer.close();
+		}
+	}
+
+	private void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
 		setupConsumer(
 			consumer,
 			false,