You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/07 15:26:35 UTC

[flink] branch release-1.8 updated (d084c9e -> 1e25889)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d084c9e  [hotfix][travis] Skip WebUI build process for e2e runs
     new a956a49  [FLINK-12219] Log uncaught exceptions and terminate in case Dispatcher#jobReachedGloballyTerminalState fails
     new 1e25889  [FLINK-12342][yarn] Add config option for heartbeat interval during container requests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/yarn_config_configuration.html       |  5 ++
 .../flink/runtime/concurrent/FutureUtils.java      | 26 ++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 38 ++++++++-------
 .../flink/runtime/concurrent/FutureUtilsTest.java  | 56 ++++++++++++++++++++++
 .../org/apache/flink/yarn/YarnResourceManager.java | 11 ++---
 .../yarn/configuration/YarnConfigOptions.java      | 18 +++++++
 6 files changed, 132 insertions(+), 22 deletions(-)


[flink] 02/02: [FLINK-12342][yarn] Add config option for heartbeat interval during container requests

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e25889796f1fb5e857b51158005e89d7a462595
Author: Peter Huang <hp...@uber.com>
AuthorDate: Wed May 1 13:50:08 2019 -0700

    [FLINK-12342][yarn] Add config option for heartbeat interval during container requests
    
    Flink's YarnResourceManager sets a faster heartbeat interval when it is requesting containers
    from Yarn's ResourceManager. Since requests and responses are transported via heartbeats, this
    speeds up requests. However, it can also put additional load on Yarn due to excessive container
    requests. Therefore, this commit introduces a config option which allows to control this heartbeat.
---
 .../_includes/generated/yarn_config_configuration.html |  5 +++++
 .../org/apache/flink/yarn/YarnResourceManager.java     | 11 +++++------
 .../flink/yarn/configuration/YarnConfigOptions.java    | 18 ++++++++++++++++++
 3 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index ab7e224..cb73c86 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -38,6 +38,11 @@
             <td>Time between heartbeats with the ResourceManager in seconds.</td>
         </tr>
         <tr>
+            <td><h5>yarn.heartbeat.container-request-interval</h5></td>
+            <td style="word-wrap: break-word;">500</td>
+            <td>Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:<ul><li>The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.</li><li>The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.</li></ul>If you observe too many container allocations on the ResourceManager, t [...]
+        </tr>
+        <tr>
             <td><h5>yarn.maximum-failed-containers</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Maximum number of containers the system is going to reallocate in case of a failure.</td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 375c196..d054afe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -86,10 +86,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	/** YARN container map. Package private for unit test purposes. */
 	private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
-	/** The heartbeat interval while the resource master is waiting for containers. */
-	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
-
 	/** Environment variable name of the final container id used by the YarnResourceManager.
 	 * Container ID generation may vary across Hadoop versions. */
 	static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
@@ -114,6 +110,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final int defaultCpus;
 
+	/** The heartbeat interval while the resource master is waiting for containers. */
+	private final int containerRequestHeartbeatIntervalMillis;
+
 	/** Client to communicate with the Resource Manager (YARN's master). */
 	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
 
@@ -173,6 +172,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
 		}
 		yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+		containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
 		numPendingContainerRequests = 0;
 
 		this.webInterfaceUrl = webInterfaceUrl;
@@ -514,8 +514,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		resourceManagerClient.addContainerRequest(getContainerRequest());
 
 		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
-
+		resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
 		numPendingContainerRequests++;
 
 		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 6abeb0d..bd543ca 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * This class holds configuration constants used by Flink's YARN runners.
@@ -108,6 +110,22 @@ public class YarnConfigOptions {
 		.withDescription("Time between heartbeats with the ResourceManager in seconds.");
 
 	/**
+	 * The heartbeat interval between the Application Master and the YARN Resource Manager
+	 * if Flink is requesting containers.
+	 */
+	public static final ConfigOption<Integer> CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS =
+		key("yarn.heartbeat.container-request-interval")
+			.defaultValue(500)
+			.withDescription(
+				new Description.DescriptionBuilder()
+					.text("Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:")
+					.list(
+						text("The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats."),
+						text("The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn."))
+					.text("If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See %s for more information.", link("https://issues.apache.org/jira/browse/YARN-1902", "this link"))
+					.build());
+
+	/**
 	 * When a Flink job is submitted to YARN, the JobManager's host and the number of available
 	 * processing slots is written into a properties file, so that the Flink client is able
 	 * to pick those details up.


[flink] 01/02: [FLINK-12219] Log uncaught exceptions and terminate in case Dispatcher#jobReachedGloballyTerminalState fails

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a956a49876bb1733bb9354372c25c05d0d96d7be
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Apr 30 14:47:16 2019 +0200

    [FLINK-12219] Log uncaught exceptions and terminate in case Dispatcher#jobReachedGloballyTerminalState fails
    
    FutureUtils#assertNoException will assert that the given future has not been completed
    exceptionally. If it has been completed exceptionally, then it will call the
    FatalExitExceptionHandler.
    
    This commit uses assertNoException to assert that the Dispatcher#jobReachedGloballyTerminalState
    method has not failed.
    
    This closes #8334.
---
 .../flink/runtime/concurrent/FutureUtils.java      | 26 ++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 38 ++++++++-------
 .../flink/runtime/concurrent/FutureUtilsTest.java  | 56 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 1458eab..c1613c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -969,4 +970,29 @@ public class FutureUtils {
 			return DELAYER.schedule(runnable, delay, timeUnit);
 		}
 	}
+
+	/**
+	 * Asserts that the given {@link CompletableFuture} is not completed exceptionally. If the future
+	 * is completed exceptionally, then it will call the {@link FatalExitExceptionHandler}.
+	 *
+	 * @param completableFuture to assert for no exceptions
+	 */
+	public static void assertNoException(CompletableFuture<?> completableFuture) {
+		handleUncaughtException(completableFuture, FatalExitExceptionHandler.INSTANCE);
+	}
+
+	/**
+	 * Checks that the given {@link CompletableFuture} is not completed exceptionally. If the future
+	 * is completed exceptionally, then it will call the given uncaught exception handler.
+	 *
+	 * @param completableFuture to assert for no exceptions
+	 * @param uncaughtExceptionHandler to call if the future is completed exceptionally
+	 */
+	public static void handleUncaughtException(CompletableFuture<?> completableFuture, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+		checkNotNull(completableFuture).whenComplete((ignored, throwable) -> {
+			if (throwable != null) {
+				uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), throwable);
+			}
+		});
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a1799d9..c680b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -364,26 +364,32 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
 		final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
-		jobManagerRunner.getResultFuture().whenCompleteAsync(
-			(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
-				// check if we are still the active JobManagerRunner by checking the identity
-				//noinspection ObjectEquality
-				if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
-					if (archivedExecutionGraph != null) {
-						jobReachedGloballyTerminalState(archivedExecutionGraph);
-					} else {
-						final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
 
-						if (strippedThrowable instanceof JobNotFinishedException) {
-							jobNotFinished(jobId);
+		FutureUtils.assertNoException(
+			jobManagerRunner.getResultFuture().handleAsync(
+				(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
+					// check if we are still the active JobManagerRunner by checking the identity
+					final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+					final JobManagerRunner currentJobManagerRunner = jobManagerRunnerFuture != null ? jobManagerRunnerFuture.getNow(null) : null;
+					//noinspection ObjectEquality
+					if (jobManagerRunner == currentJobManagerRunner) {
+						if (archivedExecutionGraph != null) {
+							jobReachedGloballyTerminalState(archivedExecutionGraph);
 						} else {
-							jobMasterFailed(jobId, strippedThrowable);
+							final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+							if (strippedThrowable instanceof JobNotFinishedException) {
+								jobNotFinished(jobId);
+							} else {
+								jobMasterFailed(jobId, strippedThrowable);
+							}
 						}
+					} else {
+						log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
 					}
-				} else {
-					log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
-				}
-			}, getMainThreadExecutor());
+
+					return null;
+				}, getMainThreadExecutor()));
 
 		jobManagerRunner.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index bfbd62e..e16771c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -650,4 +650,60 @@ public class FutureUtilsTest extends TestLogger {
 		Assert.assertFalse(runWithExecutor.get());
 		Assert.assertTrue(continuationFuture.isDone());
 	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithCompletedFuture() {
+		final CompletableFuture<String> future = CompletableFuture.completedFuture("foobar");
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithNormalCompletion() {
+		final CompletableFuture<String> future = new CompletableFuture<>();
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		future.complete("barfoo");
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithExceptionallyCompletedFuture() {
+		final CompletableFuture<String> future = FutureUtils.completedExceptionally(new FlinkException("foobar"));
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
+	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithExceptionallyCompletion() {
+		final CompletableFuture<String> future = new CompletableFuture<>();
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+		future.completeExceptionally(new FlinkException("barfoo"));
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
+	}
+
+	private static class TestingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+		private Throwable exception = null;
+
+		@Override
+		public void uncaughtException(Thread t, Throwable e) {
+			exception = e;
+		}
+
+		private boolean hasBeenCalled() {
+			return exception != null;
+		}
+	}
 }