You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:47:22 UTC

[flink] 03/04: [FLINK-10774] [tests] Test that Kafka partition discoverer is wokeup before closed when concurrently accessed

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83183ce4d96d497d48705a1d503cce3c9d24d508
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 29 12:10:55 2019 +0800

    [FLINK-10774] [tests] Test that Kafka partition discoverer is wokeup before closed when concurrently accessed
---
 .../kafka/FlinkKafkaConsumerBaseTest.java           | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 54b2c8a..af4fd98 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -493,7 +493,11 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 	public void testClosePartitionDiscovererWhenKafkaFetcherFails() throws Exception {
 		final FlinkException failureCause = new FlinkException("Run Kafka fetcher failure.");
 
-		final DummyPartitionDiscoverer testPartitionDiscoverer = new DummyPartitionDiscoverer();
+		// in this scenario, the partition discoverer will be concurrently accessed;
+		// use the WakeupBeforeCloseTestingPartitionDiscoverer to verify that we always call
+		// wakeup() before closing the discoverer
+		final WakeupBeforeCloseTestingPartitionDiscoverer testPartitionDiscoverer = new WakeupBeforeCloseTestingPartitionDiscoverer();
+
 		final AbstractFetcher<String, ?> mock = (AbstractFetcher<String, ?>) mock(AbstractFetcher.class);
 		doThrow(failureCause).when(mock).runFetchLoop();
 
@@ -739,6 +743,17 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 		}
 	}
 
+	private static class WakeupBeforeCloseTestingPartitionDiscoverer extends DummyPartitionDiscoverer {
+		@Override
+		protected void closeConnections() {
+			if (!isWakedUp()) {
+				fail("Partition discoverer should have been waked up first before closing.");
+			}
+
+			super.closeConnections();
+		}
+	}
+
 	private static class DummyPartitionDiscoverer extends AbstractPartitionDiscoverer {
 
 		private final List<String> allTopics;
@@ -789,6 +804,10 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
 		boolean isClosed() {
 			return closed;
 		}
+
+		public boolean isWakedUp() {
+			return wakedUp;
+		}
 	}
 
 	private static class TestingFetcher<T, KPH> extends AbstractFetcher<T, KPH> {