You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:17 UTC

[10/12] flink git commit: [FLINK-5330] [tests] Harden KafkaConsumer08Test to fail reliably with unknown host exception

[FLINK-5330] [tests] Harden KafkaConsumer08Test to fail reliably with unknown host exception

Using static mocking to reliably fail the InetAddress.getByName call with an UnknowHostException.
Furthermore, the PR decreases the connection timeouts which speeds the test execution up.

This closes #2998


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef6b473a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef6b473a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef6b473a

Branch: refs/heads/master
Commit: ef6b473aa6705368a50930bd4205f7cc2e0f6584
Parents: a5cf88f
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 13 15:48:06 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumer08Test.java   | 38 +++++++++++++++++---
 1 file changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef6b473a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index 9520f55..5ae74d7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -21,15 +21,29 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.when;
 
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Properties;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaConsumer08.class)
+@PowerMockIgnore("javax.management.*")
 public class KafkaConsumer08Test {
 
 	@Test
@@ -80,6 +94,7 @@ public class KafkaConsumer08Test {
 			props.setProperty("zookeeper.connect", "localhost:56794");
 			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
 			props.setProperty("group.id", "non-existent-group");
+			props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
 
 			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
 			consumer.open(new Configuration());
@@ -93,10 +108,16 @@ public class KafkaConsumer08Test {
 	@Test
 	public void testAllBoostrapServerHostsAreInvalid() {
 		try {
+			String unknownHost = "foobar:11111";
+
+			URL unknownHostURL = NetUtils.getCorrectHostnamePort(unknownHost);
+
+			PowerMockito.mockStatic(InetAddress.class);
+			when(InetAddress.getByName(Matchers.eq(unknownHostURL.getHost()))).thenThrow(new UnknownHostException("Test exception"));
+
 			String zookeeperConnect = "localhost:56794";
-			String bootstrapServers = "indexistentHost:11111";
 			String groupId = "non-existent-group";
-			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+			Properties props = createKafkaProps(zookeeperConnect, unknownHost, groupId);
 			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
 					new SimpleStringSchema(), props);
 			consumer.open(new Configuration());
@@ -112,9 +133,16 @@ public class KafkaConsumer08Test {
 	public void testAtLeastOneBootstrapServerHostIsValid() {
 		try {
 			String zookeeperConnect = "localhost:56794";
-			// we declare one valid boostrap server, namely the one with
+			String unknownHost = "foobar:11111";
+			// we declare one valid bootstrap server, namely the one with
 			// 'localhost'
-			String bootstrapServers = "indexistentHost:11111, localhost:22222";
+			String bootstrapServers = unknownHost + ", localhost:22222";
+
+			URL unknownHostURL = NetUtils.getCorrectHostnamePort(unknownHost);
+
+			PowerMockito.mockStatic(InetAddress.class);
+			when(InetAddress.getByName(Matchers.eq(unknownHostURL.getHost()))).thenThrow(new UnknownHostException("Test exception"));
+
 			String groupId = "non-existent-group";
 			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
 			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
@@ -134,6 +162,8 @@ public class KafkaConsumer08Test {
 		props.setProperty("zookeeper.connect", zookeeperConnect);
 		props.setProperty("bootstrap.servers", bootstrapServers);
 		props.setProperty("group.id", groupId);
+		props.setProperty("socket.timeout.ms", "100");
+		props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
 		return props;
 	}
 }