You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:47:23 UTC

[flink] 04/04: [FLINK-10774][tests] Refactor FlinkKafkaConsumerBaseTest#testConsumerLifeCycle

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3cbaabc527dee12a1a28ba5542cea60a939baf75
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jan 30 17:43:20 2019 +0100

    [FLINK-10774][tests] Refactor FlinkKafkaConsumerBaseTest#testConsumerLifeCycle
    
    Split #testConsumerLifeCycle into two methods which represent the two if-else
    branches.
    
    This closes #7606.
---
 .../kafka/FlinkKafkaConsumerBaseTest.java          | 46 ++++++++++------------
 1 file changed, 21 insertions(+), 25 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index af4fd98..e59e2a6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -64,7 +64,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayDeque;
@@ -469,7 +468,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 
 		final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer);
 
-		testConsumerLifeCycle(consumer, failureCause);
+		testFailingConsumerLifecycle(consumer, failureCause);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
 	}
 
@@ -485,7 +484,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 			testPartitionDiscoverer,
 			100L);
 
-		testConsumerLifeCycle(consumer, failureCause);
+		testFailingConsumerLifecycle(consumer, failureCause);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
 	}
 
@@ -503,40 +502,37 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 
 		final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L);
 
-		testConsumerLifeCycle(consumer, failureCause);
+		testFailingConsumerLifecycle(consumer, failureCause);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
 	}
 
+	private void testFailingConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer, @Nonnull Exception expectedException) throws Exception {
+		try {
+			setupConsumer(testKafkaConsumer);
+			testKafkaConsumer.run(new TestSourceContext<>());
+
+			fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
+		} catch (Exception e) {
+			assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
+		}
+		testKafkaConsumer.close();
+	}
+
 	@Test
 	public void testClosePartitionDiscovererWithCancellation() throws Exception {
 		final DummyPartitionDiscoverer testPartitionDiscoverer = new DummyPartitionDiscoverer();
 
 		final TestingFlinkKafkaConsumer<String> consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L);
 
-		testConsumerLifeCycle(consumer, null);
+		testNormalConsumerLifecycle(consumer);
 		assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
 	}
 
-	private void testConsumerLifeCycle(
-			FlinkKafkaConsumerBase<String> testKafkaConsumer,
-			@Nullable Exception expectedException) throws Exception {
-
-		if (expectedException == null) {
-			setupConsumer(testKafkaConsumer);
-			final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
-			testKafkaConsumer.close();
-			runFuture.get();
-		} else {
-			try {
-				setupConsumer(testKafkaConsumer);
-				testKafkaConsumer.run(new TestSourceContext<>());
-
-				fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
-			} catch (Exception e) {
-				assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
-			}
-			testKafkaConsumer.close();
-		}
+	private void testNormalConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer) throws Exception {
+		setupConsumer(testKafkaConsumer);
+		final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
+		testKafkaConsumer.close();
+		runFuture.get();
 	}
 
 	private void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {