You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/24 17:23:35 UTC

[flink] branch release-1.9 updated: [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 4d70447  [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
4d70447 is described below

commit 4d70447f6daac7914dc3a8749a1d9963c61f8d4c
Author: tison <wa...@gmail.com>
AuthorDate: Thu Sep 19 16:29:29 2019 +0800

    [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
    
    This closes #9719.
---
 .../org/apache/flink/yarn/YarnResourceManager.java   |  4 +++-
 .../apache/flink/yarn/YarnResourceManagerTest.java   | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c7cb064..8aab7c7 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -97,6 +97,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	 * In task executor we use the hostnames given by YARN consistently throughout akka */
 	static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
+	static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager.";
+
 	/** Default heartbeat interval between this resource manager and the YARN ResourceManager. */
 	private final int yarnHeartbeatIntervalMillis;
 
@@ -455,7 +457,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	@Override
 	public void onShutdownRequest() {
-		closeAsync();
+		onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
 	}
 
 	@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index df7d85b..d8a106e 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -49,6 +50,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -84,6 +86,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
@@ -92,7 +95,9 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static org.apache.flink.yarn.YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -344,6 +349,21 @@ public class YarnResourceManagerTest extends TestLogger {
 	}
 
 	@Test
+	public void testShutdownRequestCausesFatalError() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				resourceManager.onShutdownRequest();
+
+				Throwable t = testingFatalErrorHandler.getErrorFuture().get(2000L, TimeUnit.MILLISECONDS);
+				assertThat(ExceptionUtils.findThrowable(t, ResourceManagerException.class).isPresent(), is(true));
+				assertThat(ExceptionUtils.findThrowableWithMessage(t, ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+
+				testingFatalErrorHandler.clearError();
+			});
+		}};
+	}
+
+	@Test
 	public void testStopWorker() throws Exception {
 		new Context() {{
 			runTest(() -> {