You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/05 13:34:55 UTC

[flink] branch release-1.11 updated (39e3d6a -> cc59b3a)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 39e3d6a  [FLINK-18048] Fix --host option for standalone application cluster
     new 6505fe4  [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase
     new 31c3a15  [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase
     new e4034ac  [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes
     new a03c8f1  [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes
     new cc59b3a  [FLINK-18020] Increase timeout in SQLClientKafkaITCase

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/tests/util/kafka/KafkaResource.java    |  7 ++++---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java  | 17 +++++++++++------
 .../flink/tests/util/kafka/SQLClientKafkaITCase.java    | 17 +++++++++++------
 .../flink/tests/util/kafka/StreamingKafkaITCase.java    |  8 ++++++--
 4 files changed, 32 insertions(+), 17 deletions(-)


[flink] 03/05: [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4034ac296d4e8b43af570d3436cd2dec33d9ef1
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jun 4 10:34:16 2020 +0200

    [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes
    
    Duplicate topic names and leftover data could be a potential source of
    instabilities.
---
 .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index 4f62839..5e64159 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -43,6 +43,7 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -65,6 +66,8 @@ public class StreamingKafkaITCase extends TestLogger {
 
 	private final Path kafkaExampleJar;
 
+	private final String kafkaVersion;
+
 	@Rule
 	public final KafkaResource kafka;
 
@@ -81,14 +84,15 @@ public class StreamingKafkaITCase extends TestLogger {
 	public StreamingKafkaITCase(final String kafkaExampleJarPattern, final String kafkaVersion) {
 		this.kafkaExampleJar = TestUtils.getResourceJar(kafkaExampleJarPattern);
 		this.kafka = KafkaResource.get(kafkaVersion);
+		this.kafkaVersion = kafkaVersion;
 	}
 
 	@Test
 	public void testKafka() throws Exception {
 		try (final ClusterController clusterController = flink.startCluster(1)) {
 
-			final String inputTopic = "test-input";
-			final String outputTopic = "test-output";
+			final String inputTopic = "test-input-" + kafkaVersion + "-" + UUID.randomUUID().toString();
+			final String outputTopic = "test-output" + kafkaVersion + "-" + UUID.randomUUID().toString();
 
 			// create the required topics
 			kafka.createTopic(1, 1, inputTopic);


[flink] 02/05: [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31c3a15731e91d01fb033fe5a8f8173e3ba0cb38
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jun 4 10:33:24 2020 +0200

    [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase
---
 .../org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index a651d12..5dc1137 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -278,7 +278,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 			.setStdoutProcessor(messages::add)
 			.runNonBlocking()) {
 
-			final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+			final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
 			while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) {
 				try {
 					LOG.info("Waiting for messages. Received {}/{}.", messages.size(),


[flink] 04/05: [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a03c8f13f735bf5b8d120fb7aa95587574b6dfd6
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jun 4 15:20:19 2020 +0200

    [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes
    
    Duplicate topic names and leftover data could be a potential source of instabilities.
---
 .../org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java    | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index bf7358d..fabefc1 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -58,6 +58,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.junit.Assert.assertThat;
@@ -92,6 +93,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
 
+	private final String kafkaVersion;
 	private final String kafkaSQLVersion;
 	private Path result;
 	private Path sqlClientSessionConf;
@@ -107,6 +109,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 
 	public SQLClientKafkaITCase(String kafkaVersion, String kafkaSQLVersion, String kafkaSQLJarPattern) {
 		this.kafka = KafkaResource.get(kafkaVersion);
+		this.kafkaVersion = kafkaVersion;
 		this.kafkaSQLVersion = kafkaSQLVersion;
 
 		this.sqlConnectorKafkaJar = TestUtils.getResourceJar(kafkaSQLJarPattern);
@@ -129,8 +132,8 @@ public class SQLClientKafkaITCase extends TestLogger {
 	public void testKafka() throws Exception {
 		try (ClusterController clusterController = flink.startCluster(2)) {
 			// Create topic and send message
-			String testJsonTopic = "test-json";
-			String testAvroTopic = "test-avro";
+			String testJsonTopic = "test-json-" + kafkaVersion + "-" + UUID.randomUUID().toString();
+			String testAvroTopic = "test-avro-" + kafkaVersion + "-" + UUID.randomUUID().toString();
 			kafka.createTopic(1, 1, testJsonTopic);
 			String[] messages = new String[]{
 					"{\"timestamp\": \"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",


[flink] 01/05: [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jun 4 10:16:06 2020 +0200

    [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase
    
    Before, it could happen that we time out and return early, which would
    lead to a test failure. Now, we would fail at the source of the problem.
---
 .../org/apache/flink/tests/util/kafka/KafkaResource.java  |  7 ++++---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java    | 15 ++++++++++-----
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
index 679d6c4..0157ad2 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
@@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource {
 	InetSocketAddress getZookeeperAddress();
 
 	/**
-	 * Reads up to {@code maxNumMessages} from the given topic.
+	 * Reads {@code expectedNumMessages} from the given topic. If we can't read the expected number
+	 * of messages we throw an exception.
 	 *
-	 * @param maxNumMessages maximum number of messages that should be read
+	 * @param expectedNumMessages expected number of messages that should be read
 	 * @param groupId group id to identify consumer
 	 * @param topic topic name
 	 * @return read messages
 	 * @throws IOException
 	 */
-	List<String> readMessage(int maxNumMessages, String groupId, String topic) throws IOException;
+	List<String> readMessage(int expectedNumMessages, String groupId, String topic) throws IOException;
 
 	/**
 	 * Modifies the number of partitions for the given topic.
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 405690f..a651d12 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 	}
 
 	@Override
-	public List<String> readMessage(int maxNumMessages, String groupId, String topic) throws IOException {
-		final List<String> messages = Collections.synchronizedList(new ArrayList<>(maxNumMessages));
+	public List<String> readMessage(int expectedNumMessages, String groupId, String topic) throws IOException {
+		final List<String> messages = Collections.synchronizedList(new ArrayList<>(
+				expectedNumMessages));
 
 		try (final AutoClosableProcess kafka = AutoClosableProcess
 			.create(kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh")).toString(),
@@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 				KAFKA_ADDRESS,
 				"--from-beginning",
 				"--max-messages",
-				String.valueOf(maxNumMessages),
+				String.valueOf(expectedNumMessages),
 				"--topic",
 				topic,
 				"--consumer-property",
@@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 			.runNonBlocking()) {
 
 			final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
-			while (deadline.hasTimeLeft() && messages.size() < maxNumMessages) {
+			while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) {
 				try {
-					LOG.info("Waiting for messages. Received {}/{}.", messages.size(), maxNumMessages);
+					LOG.info("Waiting for messages. Received {}/{}.", messages.size(),
+							expectedNumMessages);
 					Thread.sleep(500);
 				} catch (InterruptedException e) {
 					Thread.currentThread().interrupt();
 					break;
 				}
 			}
+			if (messages.size() != expectedNumMessages) {
+				throw new IOException("Could not read expected number of messages.");
+			}
 			return messages;
 		}
 	}


[flink] 05/05: [FLINK-18020] Increase timeout in SQLClientKafkaITCase

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc59b3a7c4853be80a9d2868ed9e7d6c09966edc
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jun 4 15:21:06 2020 +0200

    [FLINK-18020] Increase timeout in SQLClientKafkaITCase
---
 .../apache/flink/tests/util/kafka/SQLClientKafkaITCase.java    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index fabefc1..f087e31 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.cache.DownloadCache;
 import org.apache.flink.tests.util.categories.TravisGroup1;
@@ -52,6 +53,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -234,8 +236,8 @@ public class SQLClientKafkaITCase extends TestLogger {
 
 	private void checkCsvResultFile() throws Exception {
 		boolean success = false;
-		long maxRetries = 10, duration = 5000L;
-		for (int i = 0; i < maxRetries; i++) {
+		final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
+		while (!success && deadline.hasTimeLeft()) {
 			if (Files.exists(result)) {
 				byte[] bytes = Files.readAllBytes(result);
 				String[] lines = new String(bytes, Charsets.UTF_8).split("\n");
@@ -255,8 +257,8 @@ public class SQLClientKafkaITCase extends TestLogger {
 			} else {
 				LOG.info("The target CSV {} does not exist now", result);
 			}
-			Thread.sleep(duration);
+			Thread.sleep(500);
 		}
-		Assert.assertTrue("Timeout(" + (maxRetries * duration) + " sec) to read the correct CSV results.", success);
+		Assert.assertTrue("Did not get expected results before timeout.", success);
 	}
 }