You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/24 17:24:17 UTC
[flink] branch release-1.8 updated: [FLINK-14010][coordination]
YarnResourceManager#onShutdownRequest triggers fatal error
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 42cb6dd [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
42cb6dd is described below
commit 42cb6ddb1018e734444943d5f23215e26a156ff4
Author: tison <wa...@gmail.com>
AuthorDate: Thu Sep 19 16:29:29 2019 +0800
[FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
This closes #9719.
---
.../org/apache/flink/yarn/YarnResourceManager.java | 4 +++-
.../apache/flink/yarn/YarnResourceManagerTest.java | 19 +++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d054afe..a259d8a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -94,6 +94,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
* In task executor we use the hostnames given by YARN consistently throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+ static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager.";
+
/** Default heartbeat interval between this resource manager and the YARN ResourceManager. */
private final int yarnHeartbeatIntervalMillis;
@@ -449,7 +451,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
@Override
public void onShutdownRequest() {
- closeAsync();
+ onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
}
@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb75b66..53f8e25 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -55,6 +56,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
@@ -100,7 +102,9 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static org.apache.flink.yarn.YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -386,6 +390,21 @@ public class YarnResourceManagerTest extends TestLogger {
}
@Test
+ public void testShutdownRequestCausesFatalError() throws Exception {
+ new Context() {{
+ runTest(() -> {
+ resourceManager.onShutdownRequest();
+
+ Throwable t = testingFatalErrorHandler.getErrorFuture().get(2000L, TimeUnit.MILLISECONDS);
+ assertThat(ExceptionUtils.findThrowable(t, ResourceManagerException.class).isPresent(), is(true));
+ assertThat(ExceptionUtils.findThrowableWithMessage(t, ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+
+ testingFatalErrorHandler.clearError();
+ });
+ }};
+ }
+
+ @Test
public void testStopWorker() throws Exception {
new Context() {{
runTest(() -> {