You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/16 13:57:12 UTC

[flink] 01/02: [FLINK-18311] Fix StreamingKafkaITCase on Kafka 2.4.1

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2a2b5aed2751d055932efb813ba809cee304692
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Tue Jun 16 09:47:55 2020 +0200

    [FLINK-18311] Fix StreamingKafkaITCase on Kafka 2.4.1
    
    This was broken because the behaviour of the Kafka/ZooKeeper command
    line tools on Kafka 2.4.1 is slightly different:
    
    zookeeper_shell.sh does not print debug output to stderr as it did
    before. We change queryBrokerStatus() to instead consume stdout and
    check that we get valid information for the broker.
    
    The output of kafka-topics.sh now has a space between "PartitionCount:"
    and the partition count. Before we had "PartitionCount:2", now it's
    "PartitionCount: 2". We fix this by making the regex more lenient.
    
    This also splits the waiting on ZooKeeper/Kafka into two loops to better
    see which one we're blocking on.
---
 .../util/kafka/LocalStandaloneKafkaResource.java    | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 5dc1137..eb18f7e 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -139,9 +139,18 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 			kafkaDir.resolve(Paths.get("config", "server.properties")).toString()
 		);
 
-		while (!isZookeeperRunning(kafkaDir) || !isKafkaRunning(kafkaDir)) {
+		while (!isZookeeperRunning(kafkaDir)) {
 			try {
-				LOG.info("Waiting for kafka & zookeeper to start.");
+				LOG.info("Waiting for ZooKeeper to start.");
+				Thread.sleep(500L);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				break;
+			}
+		}
+		while (!isKafkaRunning(kafkaDir)) {
+			try {
+				LOG.info("Waiting for Kafka to start.");
 				Thread.sleep(500L);
 			} catch (InterruptedException e) {
 				Thread.currentThread().interrupt();
@@ -199,7 +208,9 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 	private static boolean isKafkaRunning(final Path kafkaDir) throws IOException {
 		try {
 			final AtomicBoolean atomicBrokerStarted = new AtomicBoolean(false);
-			queryBrokerStatus(kafkaDir, line -> atomicBrokerStarted.compareAndSet(false, line.contains("dataLength =")));
+			queryBrokerStatus(kafkaDir, line -> {
+				atomicBrokerStarted.compareAndSet(false, line.contains("\"port\":"));
+			});
 			return atomicBrokerStarted.get();
 		} catch (final IOException ioe) {
 			// we get an exception if zookeeper isn't running
@@ -214,7 +225,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 				ZOOKEEPER_ADDRESS,
 				"get",
 				"/brokers/ids/0")
-			.setStderrProcessor(stderrProcessor)
+			.setStdoutProcessor(stderrProcessor)
 			.runBlocking();
 	}
 
@@ -312,7 +323,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 
 	@Override
 	public int getNumPartitions(String topic) throws IOException {
-		final Pattern partitionCountPattern = Pattern.compile(".*PartitionCount:([0-9]+).*");
+		final Pattern partitionCountPattern = Pattern.compile(".*PartitionCount:\\s*([0-9]+).*");
 		final AtomicReference<Integer> partitionCountFound = new AtomicReference<>(-1);
 		AutoClosableProcess
 			.create(kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),