You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/24 17:23:35 UTC
[flink] branch release-1.9 updated: [FLINK-14010][coordination]
YarnResourceManager#onShutdownRequest triggers fatal error
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 4d70447 [FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
4d70447 is described below
commit 4d70447f6daac7914dc3a8749a1d9963c61f8d4c
Author: tison <wa...@gmail.com>
AuthorDate: Thu Sep 19 16:29:29 2019 +0800
[FLINK-14010][coordination] YarnResourceManager#onShutdownRequest triggers fatal error
This closes #9719.
---
.../org/apache/flink/yarn/YarnResourceManager.java | 4 +++-
.../apache/flink/yarn/YarnResourceManagerTest.java | 20 ++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index c7cb064..8aab7c7 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -97,6 +97,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
* In task executor we use the hostnames given by YARN consistently throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+ static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager.";
+
/** Default heartbeat interval between this resource manager and the YARN ResourceManager. */
private final int yarnHeartbeatIntervalMillis;
@@ -455,7 +457,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
@Override
public void onShutdownRequest() {
- closeAsync();
+ onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
}
@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index df7d85b..d8a106e 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -49,6 +50,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
@@ -84,6 +86,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
@@ -92,7 +95,9 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static org.apache.flink.yarn.YarnResourceManager.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -344,6 +349,21 @@ public class YarnResourceManagerTest extends TestLogger {
}
@Test
+ public void testShutdownRequestCausesFatalError() throws Exception {
+ new Context() {{
+ runTest(() -> {
+ resourceManager.onShutdownRequest();
+
+ Throwable t = testingFatalErrorHandler.getErrorFuture().get(2000L, TimeUnit.MILLISECONDS);
+ assertThat(ExceptionUtils.findThrowable(t, ResourceManagerException.class).isPresent(), is(true));
+ assertThat(ExceptionUtils.findThrowableWithMessage(t, ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+
+ testingFatalErrorHandler.clearError();
+ });
+ }};
+ }
+
+ @Test
public void testStopWorker() throws Exception {
new Context() {{
runTest(() -> {