You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/09 12:31:57 UTC
[flink] 13/15: [hotfix][tests] Rework Process IO handling
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8b3106b426e4fc6246fa5a5e89eddb24ea741d9a
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 31 10:20:27 2019 +0100
[hotfix][tests] Rework Process IO handling
---
.../flink/tests/util/AutoClosableProcess.java | 110 +++++++++++++++++----
.../tests/PrometheusReporterEndToEndITCase.java | 13 +--
2 files changed, 99 insertions(+), 24 deletions(-)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
index fbeeb1d..533ffd0 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -20,16 +20,28 @@ package org.apache.flink.tests.util;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
/**
* Utility class to terminate a given {@link Process} when exiting a try-with-resources statement.
*/
public class AutoClosableProcess implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
+
private final Process process;
public AutoClosableProcess(final Process process) {
@@ -42,35 +54,97 @@ public class AutoClosableProcess implements AutoCloseable {
}
public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
- return runNonBlocking(commands);
+ return create(commands).runNonBlocking();
}
- public static Process runBlocking(String... commands) throws IOException {
- return runBlocking(Duration.ofSeconds(30), commands);
+ public static void runBlocking(String... commands) throws IOException {
+ create(commands).runBlocking();
}
- public static Process runBlocking(Duration timeout, String... commands) throws IOException {
- final Process process = createProcess(commands);
+ public static AutoClosableProcessBuilder create(String... commands) {
+ return new AutoClosableProcessBuilder(commands);
+ }
- try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
- final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
- if (!success) {
- throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
- }
- if (process.exitValue() != 0) {
- throw new RuntimeException("Process execution failed due error.");
+ /**
+ * Builder for most sophisticated processes.
+ */
+ public static final class AutoClosableProcessBuilder {
+ private final String[] commands;
+ private Consumer<String> stdoutProcessor = line -> {
+ };
+ private Consumer<String> stderrProcessor = line -> {
+ };
+
+ AutoClosableProcessBuilder(final String... commands) {
+ this.commands = commands;
+ }
+
+ public AutoClosableProcessBuilder setStdoutProcessor(final Consumer<String> stdoutProcessor) {
+ this.stdoutProcessor = stdoutProcessor;
+ return this;
+ }
+
+ public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stderrProcessor) {
+ this.stderrProcessor = stderrProcessor;
+ return this;
+ }
+
+ public void runBlocking() throws IOException {
+ runBlocking(Duration.ofSeconds(30));
+ }
+
+ public void runBlocking(final Duration timeout) throws IOException {
+ final StringWriter sw = new StringWriter();
+ try (final PrintWriter printer = new PrintWriter(sw)) {
+ final Process process = createProcess(commands, stdoutProcessor, line -> {
+ stderrProcessor.accept(line);
+ printer.println(line);
+ });
+
+ try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
+ final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ if (!success) {
+ throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
+ }
+
+ if (process.exitValue() != 0) {
+ throw new IOException("Process execution failed due error. Error output:" + sw);
+ }
+ } catch (TimeoutException | InterruptedException e) {
+ throw new IOException("Process failed due to timeout.");
+ }
}
- } catch (TimeoutException | InterruptedException e) {
- throw new RuntimeException("Process failed due to timeout.");
}
- return process;
+
+ public AutoClosableProcess runNonBlocking() throws IOException {
+ return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor));
+ }
}
- private static Process createProcess(String... commands) throws IOException {
+ private static Process createProcess(final String[] commands, Consumer<String> stdoutProcessor, Consumer<String> stderrProcessor) throws IOException {
final ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(commands);
- processBuilder.inheritIO();
- return processBuilder.start();
+
+ final Process process = processBuilder.start();
+
+ processStream(process.getInputStream(), stdoutProcessor);
+ processStream(process.getErrorStream(), stderrProcessor);
+
+ return process;
+ }
+
+ private static void processStream(final InputStream stream, final Consumer<String> streamConsumer) {
+ new Thread(() -> {
+ try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ streamConsumer.accept(line);
+ }
+ } catch (IOException e) {
+ LOG.error("Failure while processing process stdout/stderr.", e);
+ }
+ }
+ ).start();
}
@Override
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 5d189de..258d293 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -109,12 +109,13 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
Files.createDirectory(tmpPrometheusDir);
LOG.info("Downloading Prometheus.");
- runBlocking(
- Duration.ofMinutes(5),
- CommandLineWrapper
- .wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
- .targetDir(tmpPrometheusDir)
- .build());
+ AutoClosableProcess
+ .create(
+ CommandLineWrapper
+ .wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
+ .targetDir(tmpPrometheusDir)
+ .build())
+ .runBlocking(Duration.ofMinutes(5));
LOG.info("Unpacking Prometheus.");
runBlocking(