You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/05 13:34:56 UTC

[flink] 01/05: [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jun 4 10:16:06 2020 +0200

    [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase
    
    Before, it could happen that we time out and return early, which would
    lead to a test failure. Now, we would fail at the source of the problem.
---
 .../org/apache/flink/tests/util/kafka/KafkaResource.java  |  7 ++++---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java    | 15 ++++++++++-----
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
index 679d6c4..0157ad2 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
@@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource {
 	InetSocketAddress getZookeeperAddress();
 
 	/**
-	 * Reads up to {@code maxNumMessages} from the given topic.
+	 * Reads {@code expectedNumMessages} from the given topic. If we can't read the expected number
+	 * of messages we throw an exception.
 	 *
-	 * @param maxNumMessages maximum number of messages that should be read
+	 * @param expectedNumMessages expected number of messages that should be read
 	 * @param groupId group id to identify consumer
 	 * @param topic topic name
 	 * @return read messages
 	 * @throws IOException
 	 */
-	List<String> readMessage(int maxNumMessages, String groupId, String topic) throws IOException;
+	List<String> readMessage(int expectedNumMessages, String groupId, String topic) throws IOException;
 
 	/**
 	 * Modifies the number of partitions for the given topic.
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 405690f..a651d12 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 	}
 
 	@Override
-	public List<String> readMessage(int maxNumMessages, String groupId, String topic) throws IOException {
-		final List<String> messages = Collections.synchronizedList(new ArrayList<>(maxNumMessages));
+	public List<String> readMessage(int expectedNumMessages, String groupId, String topic) throws IOException {
+		final List<String> messages = Collections.synchronizedList(new ArrayList<>(
+				expectedNumMessages));
 
 		try (final AutoClosableProcess kafka = AutoClosableProcess
 			.create(kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh")).toString(),
@@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 				KAFKA_ADDRESS,
 				"--from-beginning",
 				"--max-messages",
-				String.valueOf(maxNumMessages),
+				String.valueOf(expectedNumMessages),
 				"--topic",
 				topic,
 				"--consumer-property",
@@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 			.runNonBlocking()) {
 
 			final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
-			while (deadline.hasTimeLeft() && messages.size() < maxNumMessages) {
+			while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) {
 				try {
-					LOG.info("Waiting for messages. Received {}/{}.", messages.size(), maxNumMessages);
+					LOG.info("Waiting for messages. Received {}/{}.", messages.size(),
+							expectedNumMessages);
 					Thread.sleep(500);
 				} catch (InterruptedException e) {
 					Thread.currentThread().interrupt();
 					break;
 				}
 			}
+			if (messages.size() != expectedNumMessages) {
+				throw new IOException("Could not read expected number of messages.");
+			}
 			return messages;
 		}
 	}