You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:47:22 UTC
[flink] 03/04: [FLINK-10774] [tests] Test that Kafka partition
discoverer is wokeup before closed when concurrently accessed
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 83183ce4d96d497d48705a1d503cce3c9d24d508
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 29 12:10:55 2019 +0800
[FLINK-10774] [tests] Test that Kafka partition discoverer is wokeup before closed when concurrently accessed
---
.../kafka/FlinkKafkaConsumerBaseTest.java | 21 ++++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 54b2c8a..af4fd98 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -493,7 +493,11 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
public void testClosePartitionDiscovererWhenKafkaFetcherFails() throws Exception {
final FlinkException failureCause = new FlinkException("Run Kafka fetcher failure.");
- final DummyPartitionDiscoverer testPartitionDiscoverer = new DummyPartitionDiscoverer();
+ // in this scenario, the partition discoverer will be concurrently accessed;
+ // use the WakeupBeforeCloseTestingPartitionDiscoverer to verify that we always call
+ // wakeup() before closing the discoverer
+ final WakeupBeforeCloseTestingPartitionDiscoverer testPartitionDiscoverer = new WakeupBeforeCloseTestingPartitionDiscoverer();
+
final AbstractFetcher<String, ?> mock = (AbstractFetcher<String, ?>) mock(AbstractFetcher.class);
doThrow(failureCause).when(mock).runFetchLoop();
@@ -739,6 +743,17 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
}
}
+ private static class WakeupBeforeCloseTestingPartitionDiscoverer extends DummyPartitionDiscoverer {
+ @Override
+ protected void closeConnections() {
+ if (!isWakedUp()) {
+ fail("Partition discoverer should have been waked up first before closing.");
+ }
+
+ super.closeConnections();
+ }
+ }
+
private static class DummyPartitionDiscoverer extends AbstractPartitionDiscoverer {
private final List<String> allTopics;
@@ -789,6 +804,10 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
boolean isClosed() {
return closed;
}
+
+ public boolean isWakedUp() {
+ return wakedUp;
+ }
}
private static class TestingFetcher<T, KPH> extends AbstractFetcher<T, KPH> {