You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/10 13:03:12 UTC

[flink] branch travis_jdk9_test updated (7d13faa -> b3ae6db)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git.


 discard 7d13faa  disable container e2e tests
 discard 6e6f648  prevent concurrent checkpoints
 discard 8b3106b  [hotfix][tests] Rework Process IO handling
 discard 14b3ee3  [hotfix][tests] Remove forced step logging in AutoClosableProcess
 discard 78a613b  deduplicate empty .out file check
 discard 6f30122  disable all container tests
     new 1d9d1f9  deduplicate empty .out file check
     new 4a7af64  [hotfix][tests] Remove forced step logging in AutoClosableProcess
     new 3a077ad  [hotfix][tests] Rework Process IO handling
     new 56c9710  prevent concurrent checkpoints
     new dc42b9e  disable container e2e tests
     new b3ae6db  [FLINK-12488][metrics] Pass Status group to NetworkEnvironment

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7d13faa)
            \
             N -- N -- N   refs/heads/travis_jdk9_test (b3ae6db)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/runtime/metrics/util/MetricUtils.java   | 5 +++--
 .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java  | 8 +++++---
 .../apache/flink/runtime/taskexecutor/TaskManagerServices.java    | 4 ++--
 tools/travis/splits/split_container.sh                            | 6 +++---
 4 files changed, 13 insertions(+), 10 deletions(-)


[flink] 02/06: [hotfix][tests] Remove forced step logging in AutoClosableProcess

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4a7af64e370a07ca1defc49f41ab4e8d90d07cf5
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 24 15:02:16 2019 +0100

    [hotfix][tests] Remove forced step logging in AutoClosableProcess
---
 .../flink/tests/util/AutoClosableProcess.java      | 46 +++++++++++-----------
 .../apache/flink/tests/util/FlinkDistribution.java |  6 ++-
 .../tests/PrometheusReporterEndToEndITCase.java    | 13 +++---
 3 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
index 0235930..fbeeb1d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -20,9 +20,6 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
@@ -33,8 +30,6 @@ import java.util.concurrent.TimeoutException;
  */
 public class AutoClosableProcess implements AutoCloseable {
 
-	private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
-
 	private final Process process;
 
 	public AutoClosableProcess(final Process process) {
@@ -42,35 +37,40 @@ public class AutoClosableProcess implements AutoCloseable {
 		this.process = process;
 	}
 
-	public static AutoClosableProcess runNonBlocking(String step, String... commands) throws IOException {
-		LOG.info("Step Started: " + step);
-		Process process = new ProcessBuilder()
-			.command(commands)
-			.inheritIO()
-			.start();
-		return new AutoClosableProcess(process);
+	public Process getProcess() {
+		return process;
+	}
+
+	public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
+		return runNonBlocking(commands);
 	}
 
-	public static void runBlocking(String step, String... commands) throws IOException {
-		runBlocking(step, Duration.ofSeconds(30), commands);
+	public static Process runBlocking(String... commands) throws IOException {
+		return runBlocking(Duration.ofSeconds(30), commands);
 	}
 
-	public static void runBlocking(String step, Duration timeout, String... commands) throws IOException {
-		LOG.info("Step started: " + step);
-		Process process = new ProcessBuilder()
-			.command(commands)
-			.inheritIO()
-			.start();
+	public static Process runBlocking(Duration timeout, String... commands) throws IOException {
+		final Process process = createProcess(commands);
 
 		try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
 			final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
 			if (!success) {
-				throw new TimeoutException();
+				throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
+			}
+			if (process.exitValue() != 0) {
+				throw new RuntimeException("Process execution failed due error.");
 			}
 		} catch (TimeoutException | InterruptedException e) {
-			throw new RuntimeException(step + " failed due to timeout.");
+			throw new RuntimeException("Process failed due to timeout.");
 		}
-		LOG.info("Step complete: " + step);
+		return process;
+	}
+
+	private static Process createProcess(String... commands) throws IOException {
+		final ProcessBuilder processBuilder = new ProcessBuilder();
+		processBuilder.command(commands);
+		processBuilder.inheritIO();
+		return processBuilder.start();
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
index 8c4a39c..5192bf2 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
@@ -125,7 +125,8 @@ public final class FlinkDistribution extends ExternalResource {
 	}
 
 	public void startFlinkCluster() throws IOException {
-		AutoClosableProcess.runBlocking("Start Flink cluster", bin.resolve("start-cluster.sh").toAbsolutePath().toString());
+		LOG.info("Starting Flink cluster.");
+		AutoClosableProcess.runBlocking(bin.resolve("start-cluster.sh").toAbsolutePath().toString());
 
 		final OkHttpClient client = new OkHttpClient();
 
@@ -163,7 +164,8 @@ public final class FlinkDistribution extends ExternalResource {
 	}
 
 	public void stopFlinkCluster() throws IOException {
-		AutoClosableProcess.runBlocking("Stop Flink Cluster", bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
+		LOG.info("Stopping Flink cluster.");
+		AutoClosableProcess.runBlocking(bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
 	}
 
 	public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException {
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 269754e..5d189de 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -108,15 +108,16 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 		final Path prometheusBinary = prometheusBinDir.resolve("prometheus");
 		Files.createDirectory(tmpPrometheusDir);
 
+		LOG.info("Downloading Prometheus.");
 		runBlocking(
-			"Download of Prometheus",
 			Duration.ofMinutes(5),
 			CommandLineWrapper
 				.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
 				.targetDir(tmpPrometheusDir)
 				.build());
 
-		runBlocking("Extraction of Prometheus archive",
+		LOG.info("Unpacking Prometheus.");
+		runBlocking(
 			CommandLineWrapper
 				.tar(prometheusArchive)
 				.extract()
@@ -124,7 +125,8 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 				.targetDir(tmpPrometheusDir)
 				.build());
 
-		runBlocking("Set Prometheus scrape interval",
+		LOG.info("Setting Prometheus scrape interval.");
+		runBlocking(
 			CommandLineWrapper
 				.sed("s/\\(scrape_interval:\\).*/\\1 1s/", prometheusConfig)
 				.inPlace()
@@ -141,14 +143,15 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 			.map(port -> "'localhost:" + port + "'")
 			.collect(Collectors.joining(", "));
 
-		runBlocking("Set Prometheus scrape targets to (" + scrapeTargets + ")",
+		LOG.info("Setting Prometheus scrape targets to {}.", scrapeTargets);
+		runBlocking(
 			CommandLineWrapper
 				.sed("s/\\(targets:\\).*/\\1 [" + scrapeTargets + "]/", prometheusConfig)
 				.inPlace()
 				.build());
 
+		LOG.info("Starting Prometheus server.");
 		try (AutoClosableProcess prometheus = runNonBlocking(
-			"Start Prometheus server",
 			prometheusBinary.toAbsolutePath().toString(),
 			"--config.file=" + prometheusConfig.toAbsolutePath(),
 			"--storage.tsdb.path=" + prometheusBinDir.resolve("data").toAbsolutePath())) {


[flink] 01/06: deduplicate empty .out file check

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d9d1f955b6bce4491fcecf92983ad52ec39b4fb
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 8 11:59:27 2019 +0200

    deduplicate empty .out file check
---
 flink-end-to-end-tests/test-scripts/common_ha.sh                  | 8 ++++----
 .../test-scripts/test_ha_per_job_cluster_datastream.sh            | 6 ++----
 2 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_ha.sh b/flink-end-to-end-tests/test-scripts/common_ha.sh
index 6521cb4..154ddb9 100644
--- a/flink-end-to-end-tests/test-scripts/common_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ha.sh
@@ -18,6 +18,8 @@
 # limitations under the License.
 ################################################################################
 
+source "${END_TO_END_DIR}"/test-scripts/common.sh
+
 # flag indicating if we have already cleared up things after a test
 CLEARED=0
 
@@ -59,10 +61,8 @@ function verify_logs() {
     local VERIFY_CHECKPOINTS=$2
 
     # verify that we have no alerts
-    if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
-        echo "FAILURE: Alerts found at the general purpose job."
-        EXIT_CODE=1
-    fi
+    check_logs_for_non_empty_out_files
+    EXIT_CODE=$(($EXIT_CODE+$?))
 
     # checks that all apart from the first JM recover the failed jobgraph.
     if ! verify_num_occurences_in_logs 'standalonesession' 'Recovered SubmittedJobGraph' ${JM_FAILURES}; then
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
index 61dd8bc..66a6381 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_per_job_cluster_datastream.sh
@@ -69,10 +69,8 @@ function verify_logs_per_job() {
     local EXIT_CODE=0
 
     # verify that we have no alerts
-    if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
-        echo "FAILURE: Alerts found at the general purpose job."
-        EXIT_CODE=1
-    fi
+    check_logs_for_non_empty_out_files
+    EXIT_CODE=$(($EXIT_CODE+$?))
 
     # checks that all apart from the first JM recover the failed jobgraph.
     if ! verify_num_occurences_in_logs 'standalonejob' 'Found 0 checkpoints in ZooKeeper' 1; then


[flink] 06/06: [FLINK-12488][metrics] Pass Status group to NetworkEnvironment

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3ae6dbb13a1ab7ca06ff5190b574b4a6d8cbe3f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 10 14:56:40 2019 +0200

    [FLINK-12488][metrics] Pass Status group to NetworkEnvironment
---
 .../java/org/apache/flink/runtime/metrics/util/MetricUtils.java   | 5 +++--
 .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java  | 8 +++++---
 .../apache/flink/runtime/taskexecutor/TaskManagerServices.java    | 4 ++--
 3 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index c30ef62..4ce8f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
@@ -89,7 +90,7 @@ public class MetricUtils {
 		return jobManagerMetricGroup;
 	}
 
-	public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
+	public static Tuple2<TaskManagerMetricGroup, MetricGroup> instantiateTaskManagerMetricGroup(
 			MetricRegistry metricRegistry,
 			String hostName,
 			ResourceID resourceID,
@@ -107,7 +108,7 @@ public class MetricUtils {
 		if (systemResourceProbeInterval.isPresent()) {
 			instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get());
 		}
-		return taskManagerMetricGroup;
+		return Tuple2.of(taskManagerMetricGroup, statusGroup);
 	}
 
 	public static void instantiateStatusMetrics(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 3df99dd..4288573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -20,12 +20,14 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -358,7 +360,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 				remoteAddress,
 				localCommunicationOnly);
 
-		TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+		Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
 			metricRegistry,
 			TaskManagerLocation.getHostName(remoteAddress),
 			resourceID,
@@ -366,7 +368,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 
 		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 			taskManagerServicesConfiguration,
-			taskManagerMetricGroup,
+			taskManagerMetricGroup.f1,
 			resourceID,
 			rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
 			EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
@@ -382,7 +384,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			highAvailabilityServices,
 			taskManagerServices,
 			heartbeatServices,
-			taskManagerMetricGroup,
+			taskManagerMetricGroup.f0,
 			metricQueryServiceAddress,
 			blobCacheService,
 			fatalErrorHandler);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index a3e9066..c1cd8dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -232,7 +232,7 @@ public class TaskManagerServices {
 	 */
 	public static TaskManagerServices fromConfiguration(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
-			TaskManagerMetricGroup taskManagerMetricGroup,
+			MetricGroup taskManagerMetricGroup,
 			ResourceID resourceID,
 			Executor taskIOExecutor,
 			long freeHeapMemoryWithDefrag,


[flink] 05/06: disable container e2e tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc42b9ef6af300caf4076141d50af0b4f2dc67b0
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 9 12:08:10 2019 +0200

    disable container e2e tests
---
 .travis.yml | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 82d205c..7a03168 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -119,8 +119,5 @@ jobs:
       script: ./tools/travis/nightly.sh split_checkpoints.sh
       name: checkpoints
     - env: PROFILE="-Dinclude-kinesis -Djdk9 -Dhadoop.version=2.8.3"
-      script: ./tools/travis/nightly.sh split_container.sh
-      name: container
-    - env: PROFILE="-Dinclude-kinesis -Djdk9 -Dhadoop.version=2.8.3"
       script: ./tools/travis/nightly.sh split_heavy.sh
       name: heavy


[flink] 04/06: prevent concurrent checkpoints

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 56c9710a76c6bb0c1cd421472498cf9db49b29b2
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 9 11:50:57 2019 +0200

    prevent concurrent checkpoints
---
 .../apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 913d030..dac811b 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -251,6 +251,8 @@ public class DataStreamAllroundTestJobFactory {
 
 		env.enableCheckpointing(checkpointInterval, checkpointingMode);
 
+		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50);
+
 		boolean enableExternalizedCheckpoints = pt.getBoolean(
 			ENVIRONMENT_EXTERNALIZE_CHECKPOINT.key(),
 			ENVIRONMENT_EXTERNALIZE_CHECKPOINT.defaultValue());


[flink] 03/06: [hotfix][tests] Rework Process IO handling

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch travis_jdk9_test
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a077ad2e81b1ff53a1864c8e8e1d5e9916b37e1
Author: zentol <ch...@apache.org>
AuthorDate: Thu Jan 31 10:20:27 2019 +0100

    [hotfix][tests] Rework Process IO handling
---
 .../flink/tests/util/AutoClosableProcess.java      | 110 +++++++++++++++++----
 .../tests/PrometheusReporterEndToEndITCase.java    |  13 +--
 2 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
index fbeeb1d..533ffd0 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
@@ -20,16 +20,28 @@ package org.apache.flink.tests.util;
 
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 /**
  * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement.
  */
 public class AutoClosableProcess implements AutoCloseable {
 
+	private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
+
 	private final Process process;
 
 	public AutoClosableProcess(final Process process) {
@@ -42,35 +54,97 @@ public class AutoClosableProcess implements AutoCloseable {
 	}
 
 	public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
-		return runNonBlocking(commands);
+		return create(commands).runNonBlocking();
 	}
 
-	public static Process runBlocking(String... commands) throws IOException {
-		return runBlocking(Duration.ofSeconds(30), commands);
+	public static void runBlocking(String... commands) throws IOException {
+		create(commands).runBlocking();
 	}
 
-	public static Process runBlocking(Duration timeout, String... commands) throws IOException {
-		final Process process = createProcess(commands);
+	public static AutoClosableProcessBuilder create(String... commands) {
+		return new AutoClosableProcessBuilder(commands);
+	}
 
-		try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
-			final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
-			if (!success) {
-				throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
-			}
-			if (process.exitValue() != 0) {
-				throw new RuntimeException("Process execution failed due error.");
+	/**
+	 * Builder for most sophisticated processes.
+	 */
+	public static final class AutoClosableProcessBuilder {
+		private final String[] commands;
+		private Consumer<String> stdoutProcessor = line -> {
+		};
+		private Consumer<String> stderrProcessor = line -> {
+		};
+
+		AutoClosableProcessBuilder(final String... commands) {
+			this.commands = commands;
+		}
+
+		public AutoClosableProcessBuilder setStdoutProcessor(final Consumer<String> stdoutProcessor) {
+			this.stdoutProcessor = stdoutProcessor;
+			return this;
+		}
+
+		public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stderrProcessor) {
+			this.stderrProcessor = stderrProcessor;
+			return this;
+		}
+
+		public void runBlocking() throws IOException {
+			runBlocking(Duration.ofSeconds(30));
+		}
+
+		public void runBlocking(final Duration timeout) throws IOException {
+			final StringWriter sw = new StringWriter();
+			try (final PrintWriter printer = new PrintWriter(sw)) {
+				final Process process = createProcess(commands, stdoutProcessor, line -> {
+					stderrProcessor.accept(line);
+					printer.println(line);
+				});
+
+				try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
+					final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
+					if (!success) {
+						throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
+					}
+
+					if (process.exitValue() != 0) {
+						throw new IOException("Process execution failed due error. Error output:" + sw);
+					}
+				} catch (TimeoutException | InterruptedException e) {
+					throw new IOException("Process failed due to timeout.");
+				}
 			}
-		} catch (TimeoutException | InterruptedException e) {
-			throw new RuntimeException("Process failed due to timeout.");
 		}
-		return process;
+
+		public AutoClosableProcess runNonBlocking() throws IOException {
+			return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor));
+		}
 	}
 
-	private static Process createProcess(String... commands) throws IOException {
+	private static Process createProcess(final String[] commands, Consumer<String> stdoutProcessor, Consumer<String> stderrProcessor) throws IOException {
 		final ProcessBuilder processBuilder = new ProcessBuilder();
 		processBuilder.command(commands);
-		processBuilder.inheritIO();
-		return processBuilder.start();
+
+		final Process process = processBuilder.start();
+
+		processStream(process.getInputStream(), stdoutProcessor);
+		processStream(process.getErrorStream(), stderrProcessor);
+
+		return process;
+	}
+
+	private static void processStream(final InputStream stream, final Consumer<String> streamConsumer) {
+		new Thread(() -> {
+			try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
+				String line;
+				while ((line = bufferedReader.readLine()) != null) {
+					streamConsumer.accept(line);
+				}
+			} catch (IOException e) {
+				LOG.error("Failure while processing process stdout/stderr.", e);
+			}
+		}
+		).start();
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 5d189de..258d293 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -109,12 +109,13 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
 		Files.createDirectory(tmpPrometheusDir);
 
 		LOG.info("Downloading Prometheus.");
-		runBlocking(
-			Duration.ofMinutes(5),
-			CommandLineWrapper
-				.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
-				.targetDir(tmpPrometheusDir)
-				.build());
+		AutoClosableProcess
+			.create(
+				CommandLineWrapper
+					.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
+					.targetDir(tmpPrometheusDir)
+					.build())
+			.runBlocking(Duration.ofMinutes(5));
 
 		LOG.info("Unpacking Prometheus.");
 		runBlocking(