You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/10 13:19:23 UTC

[flink] branch master updated (7113845 -> a810255)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7113845  [FLINK-12445][yarn] Cancel application on failure
     new 4bbc696  [hotfix][tests] Extend Tar wrapper to support strip argument
     new fb51839  [hotfix][build] Remove unnecessary version tag
     new 3ad3fbf  [hotfix][tests] Remove forced step logging in AutoClosableProcess
     new a810255  [hotfix][tests] Rework Process IO handling

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink-end-to-end-tests-common/pom.xml          |   1 -
 .../flink/tests/util/AutoClosableProcess.java      | 120 +++++++++++++++++----
 .../flink/tests/util/CommandLineWrapper.java       |  10 ++
 .../apache/flink/tests/util/FlinkDistribution.java |   6 +-
 .../tests/PrometheusReporterEndToEndITCase.java    |  26 +++--
 5 files changed, 126 insertions(+), 37 deletions(-)


[flink] 02/04: [hotfix][build] Remove unnecessary version tag

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb51839754bbfb973cc2690e87cfc73cef4e83b2
Author: zentol <ch...@apache.org>
AuthorDate: Tue Jan 29 13:53:12 2019 +0100

    [hotfix][build] Remove unnecessary version tag
---
 flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
index a04c43c..c94400e 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
@@ -28,7 +28,6 @@ under the License.
 	<modelVersion>4.0.0</modelVersion>
 
 	<artifactId>flink-end-to-end-tests-common</artifactId>
-	<version>1.9-SNAPSHOT</version>
 
 	<dependencies>
 		<dependency>


[flink] 04/04: [hotfix][tests] Rework Process IO handling

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a810255cca67ce011706dd56a2be620ccc4832da
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 31 10:20:27 2019 +0100

    [hotfix][tests] Rework Process IO handling
---
 .../flink/tests/util/AutoClosableProcess.java      | 110 +++++++++++++++++----
 .../tests/PrometheusReporterEndToEndITCase.java    |  13 +--
 2 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
index fbeeb1d..533ffd0 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -20,16 +20,28 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 /**
  * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement.
  */
 public class AutoClosableProcess implements AutoCloseable {
 
+	private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
+
 	private final Process process;
 
 	public AutoClosableProcess(final Process process) {
@@ -42,35 +54,97 @@ public class AutoClosableProcess implements AutoCloseable {
 	}
 
 	public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
-		return runNonBlocking(commands);
+		return create(commands).runNonBlocking();
 	}
 
-	public static Process runBlocking(String... commands) throws IOException {
-		return runBlocking(Duration.ofSeconds(30), commands);
+	public static void runBlocking(String... commands) throws IOException {
+		create(commands).runBlocking();
 	}
 
-	public static Process runBlocking(Duration timeout, String... commands) throws IOException {
-		final Process process = createProcess(commands);
+	public static AutoClosableProcessBuilder create(String... commands) {
+		return new AutoClosableProcessBuilder(commands);
+	}
 
-		try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
-			final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
-			if (!success) {
-				throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
-			}
-			if (process.exitValue() != 0) {
-				throw new RuntimeException("Process execution failed due error.");
+	/**
+	 * Builder for most sophisticated processes.
+	 */
+	public static final class AutoClosableProcessBuilder {
+		private final String[] commands;
+		private Consumer<String> stdoutProcessor = line -> {
+		};
+		private Consumer<String> stderrProcessor = line -> {
+		};
+
+		AutoClosableProcessBuilder(final String... commands) {
+			this.commands = commands;
+		}
+
+		public AutoClosableProcessBuilder setStdoutProcessor(final Consumer<String> stdoutProcessor) {
+			this.stdoutProcessor = stdoutProcessor;
+			return this;
+		}
+
+		public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stderrProcessor) {
+			this.stderrProcessor = stderrProcessor;
+			return this;
+		}
+
+		public void runBlocking() throws IOException {
+			runBlocking(Duration.ofSeconds(30));
+		}
+
+		public void runBlocking(final Duration timeout) throws IOException {
+			final StringWriter sw = new StringWriter();
+			try (final PrintWriter printer = new PrintWriter(sw)) {
+				final Process process = createProcess(commands, stdoutProcessor, line -> {
+					stderrProcessor.accept(line);
+					printer.println(line);
+				});
+
+				try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
+					final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
+					if (!success) {
+						throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
+					}
+
+					if (process.exitValue() != 0) {
+						throw new IOException("Process execution failed due error. Error output:" + sw);
+					}
+				} catch (TimeoutException | InterruptedException e) {
+					throw new IOException("Process failed due to timeout.");
+				}
 			}
-		} catch (TimeoutException | InterruptedException e) {
-			throw new RuntimeException("Process failed due to timeout.");
 		}
-		return process;
+
+		public AutoClosableProcess runNonBlocking() throws IOException {
+			return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor));
+		}
 	}
 
-	private static Process createProcess(String... commands) throws IOException {
+	private static Process createProcess(final String[] commands, Consumer<String> stdoutProcessor, Consumer<String> stderrProcessor) throws IOException {
 		final ProcessBuilder processBuilder = new ProcessBuilder();
 		processBuilder.command(commands);
-		processBuilder.inheritIO();
-		return processBuilder.start();
+
+		final Process process = processBuilder.start();
+
+		processStream(process.getInputStream(), stdoutProcessor);
+		processStream(process.getErrorStream(), stderrProcessor);
+
+		return process;
+	}
+
+	private static void processStream(final InputStream stream, final Consumer<String> streamConsumer) {
+		new Thread(() -> {
+			try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
+				String line;
+				while ((line = bufferedReader.readLine()) != null) {
+					streamConsumer.accept(line);
+				}
+			} catch (IOException e) {
+				LOG.error("Failure while processing process stdout/stderr.", e);
+			}
+		}
+		).start();
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 5d189de..258d293 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -109,12 +109,13 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 		Files.createDirectory(tmpPrometheusDir);
 
 		LOG.info("Downloading Prometheus.");
-		runBlocking(
-			Duration.ofMinutes(5),
-			CommandLineWrapper
-				.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
-				.targetDir(tmpPrometheusDir)
-				.build());
+		AutoClosableProcess
+			.create(
+				CommandLineWrapper
+					.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
+					.targetDir(tmpPrometheusDir)
+					.build())
+			.runBlocking(Duration.ofMinutes(5));
 
 		LOG.info("Unpacking Prometheus.");
 		runBlocking(


[flink] 01/04: [hotfix][tests] Extend Tar wrapper to support strip argument

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4bbc69626eb439c8fe0e6fbf066139d72e8d9795
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 24 15:02:39 2019 +0100

    [hotfix][tests] Extend Tar wrapper to support strip argument
---
 .../java/org/apache/flink/tests/util/CommandLineWrapper.java   | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
index 50fd2f8..e9ee2e8 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
@@ -113,6 +113,7 @@ public enum CommandLineWrapper {
 		private boolean zipped = false;
 		private boolean extract = false;
 		private Path targetDir;
+		private int strips = -1;
 
 		public TarBuilder(final Path file) {
 			this.file = file;
@@ -133,6 +134,11 @@ public enum CommandLineWrapper {
 			return this;
 		}
 
+		public TarBuilder strip(final int num) {
+			strips = num;
+			return this;
+		}
+
 		public String[] build() {
 			final List<String> commandsList = new ArrayList<>(4);
 			commandsList.add("tar");
@@ -146,6 +152,10 @@ public enum CommandLineWrapper {
 				commandsList.add("--directory");
 				commandsList.add(targetDir.toAbsolutePath().toString());
 			}
+			if (strips > 0) {
+				commandsList.add("--strip");
+				commandsList.add(String.valueOf(strips));
+			}
 			commandsList.add("-f");
 			commandsList.add(file.toAbsolutePath().toString());
 			return commandsList.toArray(new String[commandsList.size()]);


[flink] 03/04: [hotfix][tests] Remove forced step logging in AutoClosableProcess

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ad3fbf5f10be78db530fe70c6c29c2a45cd56c0
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 24 15:02:16 2019 +0100

    [hotfix][tests] Remove forced step logging in AutoClosableProcess
---
 .../flink/tests/util/AutoClosableProcess.java      | 46 +++++++++++-----------
 .../apache/flink/tests/util/FlinkDistribution.java |  6 ++-
 .../tests/PrometheusReporterEndToEndITCase.java    | 13 +++---
 3 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
index 0235930..fbeeb1d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -20,9 +20,6 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
@@ -33,8 +30,6 @@ import java.util.concurrent.TimeoutException;
  */
 public class AutoClosableProcess implements AutoCloseable {
 
-	private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
-
 	private final Process process;
 
 	public AutoClosableProcess(final Process process) {
@@ -42,35 +37,40 @@ public class AutoClosableProcess implements AutoCloseable {
 		this.process = process;
 	}
 
-	public static AutoClosableProcess runNonBlocking(String step, String... commands) throws IOException {
-		LOG.info("Step Started: " + step);
-		Process process = new ProcessBuilder()
-			.command(commands)
-			.inheritIO()
-			.start();
-		return new AutoClosableProcess(process);
+	public Process getProcess() {
+		return process;
+	}
+
+	public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
+		return runNonBlocking(commands);
 	}
 
-	public static void runBlocking(String step, String... commands) throws IOException {
-		runBlocking(step, Duration.ofSeconds(30), commands);
+	public static Process runBlocking(String... commands) throws IOException {
+		return runBlocking(Duration.ofSeconds(30), commands);
 	}
 
-	public static void runBlocking(String step, Duration timeout, String... commands) throws IOException {
-		LOG.info("Step started: " + step);
-		Process process = new ProcessBuilder()
-			.command(commands)
-			.inheritIO()
-			.start();
+	public static Process runBlocking(Duration timeout, String... commands) throws IOException {
+		final Process process = createProcess(commands);
 
 		try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
 			final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
 			if (!success) {
-				throw new TimeoutException();
+				throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
+			}
+			if (process.exitValue() != 0) {
+				throw new RuntimeException("Process execution failed due error.");
 			}
 		} catch (TimeoutException | InterruptedException e) {
-			throw new RuntimeException(step + " failed due to timeout.");
+			throw new RuntimeException("Process failed due to timeout.");
 		}
-		LOG.info("Step complete: " + step);
+		return process;
+	}
+
+	private static Process createProcess(String... commands) throws IOException {
+		final ProcessBuilder processBuilder = new ProcessBuilder();
+		processBuilder.command(commands);
+		processBuilder.inheritIO();
+		return processBuilder.start();
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
index 8c4a39c..5192bf2 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
@@ -125,7 +125,8 @@ public final class FlinkDistribution extends ExternalResource {
 	}
 
 	public void startFlinkCluster() throws IOException {
-		AutoClosableProcess.runBlocking("Start Flink cluster", bin.resolve("start-cluster.sh").toAbsolutePath().toString());
+		LOG.info("Starting Flink cluster.");
+		AutoClosableProcess.runBlocking(bin.resolve("start-cluster.sh").toAbsolutePath().toString());
 
 		final OkHttpClient client = new OkHttpClient();
 
@@ -163,7 +164,8 @@ public final class FlinkDistribution extends ExternalResource {
 	}
 
 	public void stopFlinkCluster() throws IOException {
-		AutoClosableProcess.runBlocking("Stop Flink Cluster", bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
+		LOG.info("Stopping Flink cluster.");
+		AutoClosableProcess.runBlocking(bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
 	}
 
 	public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException {
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 269754e..5d189de 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -108,15 +108,16 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 		final Path prometheusBinary = prometheusBinDir.resolve("prometheus");
 		Files.createDirectory(tmpPrometheusDir);
 
+		LOG.info("Downloading Prometheus.");
 		runBlocking(
-			"Download of Prometheus",
 			Duration.ofMinutes(5),
 			CommandLineWrapper
 				.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
 				.targetDir(tmpPrometheusDir)
 				.build());
 
-		runBlocking("Extraction of Prometheus archive",
+		LOG.info("Unpacking Prometheus.");
+		runBlocking(
 			CommandLineWrapper
 				.tar(prometheusArchive)
 				.extract()
@@ -124,7 +125,8 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 				.targetDir(tmpPrometheusDir)
 				.build());
 
-		runBlocking("Set Prometheus scrape interval",
+		LOG.info("Setting Prometheus scrape interval.");
+		runBlocking(
 			CommandLineWrapper
 				.sed("s/\\(scrape_interval:\\).*/\\1 1s/", prometheusConfig)
 				.inPlace()
@@ -141,14 +143,15 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 			.map(port -> "'localhost:" + port + "'")
 			.collect(Collectors.joining(", "));
 
-		runBlocking("Set Prometheus scrape targets to (" + scrapeTargets + ")",
+		LOG.info("Setting Prometheus scrape targets to {}.", scrapeTargets);
+		runBlocking(
 			CommandLineWrapper
 				.sed("s/\\(targets:\\).*/\\1 [" + scrapeTargets + "]/", prometheusConfig)
 				.inPlace()
 				.build());
 
+		LOG.info("Starting Prometheus server.");
 		try (AutoClosableProcess prometheus = runNonBlocking(
-			"Start Prometheus server",
 			prometheusBinary.toAbsolutePath().toString(),
 			"--config.file=" + prometheusConfig.toAbsolutePath(),
 			"--storage.tsdb.path=" + prometheusBinDir.resolve("data").toAbsolutePath())) {