You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/24 17:24:17 UTC

[flink] branch release-1.8 updated: [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 42cb6dd  [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
42cb6dd is described below

commit 42cb6ddb1018e734444943d5f23215e26a156ff4
Author: tison <wa...@gmail.com>
AuthorDate: Thu Sep 19 16:29:29 2019 +0800

    [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
    
    This closes #9719.
---
 .../org/apache/flink/yarn/YarnResourceManager.java    |  4 +++-
 .../apache/flink/yarn/YarnResourceManagerTest.java    | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d054afe..a259d8a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -94,6 +94,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	 * In task executor we use the hostnames given by YARN consistently throughout akka */
 	static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
+	static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager.";
+
 	/** Default heartbeat interval between this resource manager and the YARN ResourceManager. */
 	private final int yarnHeartbeatIntervalMillis;
 
@@ -449,7 +451,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	@Override
 	public void onShutdownRequest() {
-		closeAsync();
+		onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
 	}
 
 	@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb75b66..53f8e25 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -55,6 +56,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -100,7 +102,9 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static org.apache.flink.yarn.YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -386,6 +390,21 @@ public class YarnResourceManagerTest extends TestLogger {
 	}
 
 	@Test
+	public void testShutdownRequestCausesFatalError() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				resourceManager.onShutdownRequest();
+
+				Throwable t = testingFatalErrorHandler.getErrorFuture().get(2000L, TimeUnit.MILLISECONDS);
+				assertThat(ExceptionUtils.findThrowable(t, ResourceManagerException.class).isPresent(), is(true));
+				assertThat(ExceptionUtils.findThrowableWithMessage(t, ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+
+				testingFatalErrorHandler.clearError();
+			});
+		}};
+	}
+
+	@Test
 	public void testStopWorker() throws Exception {
 		new Context() {{
 			runTest(() -> {