You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/09 12:31:57 UTC

[flink] 13/15: [hotfix][tests] Rework Process IO handling

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b3106b426e4fc6246fa5a5e89eddb24ea741d9a
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 31 10:20:27 2019 +0100

    [hotfix][tests] Rework Process IO handling
---
 .../flink/tests/util/AutoClosableProcess.java      | 110 +++++++++++++++++----
 .../tests/PrometheusReporterEndToEndITCase.java    |  13 +--
 2 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
index fbeeb1d..533ffd0 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -20,16 +20,28 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 /**
  * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement.
  */
 public class AutoClosableProcess implements AutoCloseable {
 
+	private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
+
 	private final Process process;
 
 	public AutoClosableProcess(final Process process) {
@@ -42,35 +54,97 @@ public class AutoClosableProcess implements AutoCloseable {
 	}
 
 	public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
-		return runNonBlocking(commands);
+		return create(commands).runNonBlocking();
 	}
 
-	public static Process runBlocking(String... commands) throws IOException {
-		return runBlocking(Duration.ofSeconds(30), commands);
+	public static void runBlocking(String... commands) throws IOException {
+		create(commands).runBlocking();
 	}
 
-	public static Process runBlocking(Duration timeout, String... commands) throws IOException {
-		final Process process = createProcess(commands);
+	public static AutoClosableProcessBuilder create(String... commands) {
+		return new AutoClosableProcessBuilder(commands);
+	}
 
-		try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
-			final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
-			if (!success) {
-				throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
-			}
-			if (process.exitValue() != 0) {
-				throw new RuntimeException("Process execution failed due error.");
+	/**
+	 * Builder for most sophisticated processes.
+	 */
+	public static final class AutoClosableProcessBuilder {
+		private final String[] commands;
+		private Consumer<String> stdoutProcessor = line -> {
+		};
+		private Consumer<String> stderrProcessor = line -> {
+		};
+
+		AutoClosableProcessBuilder(final String... commands) {
+			this.commands = commands;
+		}
+
+		public AutoClosableProcessBuilder setStdoutProcessor(final Consumer<String> stdoutProcessor) {
+			this.stdoutProcessor = stdoutProcessor;
+			return this;
+		}
+
+		public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stderrProcessor) {
+			this.stderrProcessor = stderrProcessor;
+			return this;
+		}
+
+		public void runBlocking() throws IOException {
+			runBlocking(Duration.ofSeconds(30));
+		}
+
+		public void runBlocking(final Duration timeout) throws IOException {
+			final StringWriter sw = new StringWriter();
+			try (final PrintWriter printer = new PrintWriter(sw)) {
+				final Process process = createProcess(commands, stdoutProcessor, line -> {
+					stderrProcessor.accept(line);
+					printer.println(line);
+				});
+
+				try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
+					final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
+					if (!success) {
+						throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
+					}
+
+					if (process.exitValue() != 0) {
+						throw new IOException("Process execution failed due error. Error output:" + sw);
+					}
+				} catch (TimeoutException | InterruptedException e) {
+					throw new IOException("Process failed due to timeout.");
+				}
 			}
-		} catch (TimeoutException | InterruptedException e) {
-			throw new RuntimeException("Process failed due to timeout.");
 		}
-		return process;
+
+		public AutoClosableProcess runNonBlocking() throws IOException {
+			return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor));
+		}
 	}
 
-	private static Process createProcess(String... commands) throws IOException {
+	private static Process createProcess(final String[] commands, Consumer<String> stdoutProcessor, Consumer<String> stderrProcessor) throws IOException {
 		final ProcessBuilder processBuilder = new ProcessBuilder();
 		processBuilder.command(commands);
-		processBuilder.inheritIO();
-		return processBuilder.start();
+
+		final Process process = processBuilder.start();
+
+		processStream(process.getInputStream(), stdoutProcessor);
+		processStream(process.getErrorStream(), stderrProcessor);
+
+		return process;
+	}
+
+	private static void processStream(final InputStream stream, final Consumer<String> streamConsumer) {
+		new Thread(() -> {
+			try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
+				String line;
+				while ((line = bufferedReader.readLine()) != null) {
+					streamConsumer.accept(line);
+				}
+			} catch (IOException e) {
+				LOG.error("Failure while processing process stdout/stderr.", e);
+			}
+		}
+		).start();
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 5d189de..258d293 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -109,12 +109,13 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 		Files.createDirectory(tmpPrometheusDir);
 
 		LOG.info("Downloading Prometheus.");
-		runBlocking(
-			Duration.ofMinutes(5),
-			CommandLineWrapper
-				.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
-				.targetDir(tmpPrometheusDir)
-				.build());
+		AutoClosableProcess
+			.create(
+				CommandLineWrapper
+					.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
+					.targetDir(tmpPrometheusDir)
+					.build())
+			.runBlocking(Duration.ofMinutes(5));
 
 		LOG.info("Unpacking Prometheus.");
 		runBlocking(