You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GJL <gi...@git.apache.org> on 2018/03/09 13:37:09 UTC
[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...
GitHub user GJL opened a pull request:
https://github.com/apache/flink/pull/5675
[FLINK-7804][flip6] YarnResourceManager does not execute AMRMClientAsync callbacks in main thread
## What is the purpose of the change
*The `YarnResourceManager` registers callbacks at a `AMRMClientAsync` which it uses to react to Yarn container allocations. These callbacks, e.g., `onContainersAllocated`, modify the internal state of the YarnResourceManager. This can lead to race conditions with the `requestYarnContainer` method. To solve this problem we have to execute the state changing operations in the main thread of the `YarnResourceManager`.*
## Brief change log
- *Run AMRMClientAsync callbacks in main thread*
- *Fix `YarnResourceManagerTest`*
## Verifying this change
This change is already covered by existing tests, such as `YarnResourceManagerTest`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/GJL/flink FLINK-7804
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5675.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5675
----
commit f6c61e0c7989d6d67f784818aa94cd112f600bff
Author: gyao <ga...@...>
Date: 2018-03-09T13:36:33Z
[FLINK-7804][flip6] Run AMRMClientAsync callbacks in main thread
----
---
[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5675#discussion_r175020997
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,67 +328,74 @@ public float getProgress() {
@Override
public void onContainersCompleted(List<ContainerStatus> list) {
- for (ContainerStatus container : list) {
- if (container.getExitStatus() < 0) {
- closeTaskManagerConnection(new ResourceID(
- container.getContainerId().toString()), new Exception(container.getDiagnostics()));
+ runAsync(() -> {
+ for (ContainerStatus container : list) {
+ if (container.getExitStatus() < 0) {
+ closeTaskManagerConnection(new ResourceID(
+ container.getContainerId().toString()), new Exception(container.getDiagnostics()));
+ }
+ workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
+ }
}
- workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
- }
+ );
}
@Override
public void onContainersAllocated(List<Container> containers) {
- for (Container container : containers) {
- log.info(
- "Received new container: {} - Remaining pending container requests: {}",
- container.getId(),
- numPendingContainerRequests);
-
- if (numPendingContainerRequests > 0) {
- numPendingContainerRequests--;
-
- final String containerIdStr = container.getId().toString();
-
- workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
-
- try {
- // Context information used to start a TaskExecutor Java process
- ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
- container.getResource(),
- containerIdStr,
- container.getNodeId().getHost());
-
- nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
- } catch (Throwable t) {
- log.error("Could not start TaskManager in container {}.", container.getId(), t);
-
- // release the failed container
+ runAsync(() -> {
+ for (Container container : containers) {
+ log.info(
+ "Received new container: {} - Remaining pending container requests: {}",
+ container.getId(),
+ numPendingContainerRequests);
+
+ if (numPendingContainerRequests > 0) {
+ numPendingContainerRequests--;
+
+ final String containerIdStr = container.getId().toString();
+
+ workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+ try {
+ // Context information used to start a TaskExecutor Java process
+ ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+ container.getResource(),
+ containerIdStr,
+ container.getNodeId().getHost());
+
+ nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+ } catch (Throwable t) {
+ log.error("Could not start TaskManager in container {}.", container.getId(), t);
+
+ // release the failed container
+ resourceManagerClient.releaseAssignedContainer(container.getId());
+ // and ask for a new one
+ requestYarnContainer(container.getResource(), container.getPriority());
+ }
+ } else {
+ // return the excessive containers
+ log.info("Returning excess container {}.", container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
- // and ask for a new one
- requestYarnContainer(container.getResource(), container.getPriority());
}
- } else {
- // return the excessive containers
- log.info("Returning excess container {}.", container.getId());
- resourceManagerClient.releaseAssignedContainer(container.getId());
}
- }
- // if we are waiting for no further containers, we can go to the
- // regular heartbeat interval
- if (numPendingContainerRequests <= 0) {
- resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
- }
+ // if we are waiting for no further containers, we can go to the
+ // regular heartbeat interval
+ if (numPendingContainerRequests <= 0) {
+ resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+ }
+ });
}
@Override
public void onShutdownRequest() {
- try {
- shutDown();
- } catch (Exception e) {
- log.warn("Fail to shutdown the YARN resource manager.", e);
- }
+ runAsync(() -> {
+ try {
+ shutDown();
--- End diff --
done
---
[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5675#discussion_r174834772
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -395,10 +405,13 @@ public void onNodesUpdated(List<NodeReport> list) {
@Override
public void onError(Throwable error) {
- onFatalError(error);
+ runAsync(() -> onFatalError(error));
--- End diff --
I think it would be good to let the error propagate directly. In case of an OOM exception we want to quickly shut down the JVM.
---
[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5675#discussion_r175020983
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -395,10 +405,13 @@ public void onNodesUpdated(List<NodeReport> list) {
@Override
public void onError(Throwable error) {
- onFatalError(error);
+ runAsync(() -> onFatalError(error));
--- End diff --
done
---
[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5675
---
[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5675#discussion_r174834515
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,67 +328,74 @@ public float getProgress() {
@Override
public void onContainersCompleted(List<ContainerStatus> list) {
- for (ContainerStatus container : list) {
- if (container.getExitStatus() < 0) {
- closeTaskManagerConnection(new ResourceID(
- container.getContainerId().toString()), new Exception(container.getDiagnostics()));
+ runAsync(() -> {
+ for (ContainerStatus container : list) {
+ if (container.getExitStatus() < 0) {
+ closeTaskManagerConnection(new ResourceID(
+ container.getContainerId().toString()), new Exception(container.getDiagnostics()));
+ }
+ workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
+ }
}
- workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
- }
+ );
}
@Override
public void onContainersAllocated(List<Container> containers) {
- for (Container container : containers) {
- log.info(
- "Received new container: {} - Remaining pending container requests: {}",
- container.getId(),
- numPendingContainerRequests);
-
- if (numPendingContainerRequests > 0) {
- numPendingContainerRequests--;
-
- final String containerIdStr = container.getId().toString();
-
- workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
-
- try {
- // Context information used to start a TaskExecutor Java process
- ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
- container.getResource(),
- containerIdStr,
- container.getNodeId().getHost());
-
- nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
- } catch (Throwable t) {
- log.error("Could not start TaskManager in container {}.", container.getId(), t);
-
- // release the failed container
+ runAsync(() -> {
+ for (Container container : containers) {
+ log.info(
+ "Received new container: {} - Remaining pending container requests: {}",
+ container.getId(),
+ numPendingContainerRequests);
+
+ if (numPendingContainerRequests > 0) {
+ numPendingContainerRequests--;
+
+ final String containerIdStr = container.getId().toString();
+
+ workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+ try {
+ // Context information used to start a TaskExecutor Java process
+ ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+ container.getResource(),
+ containerIdStr,
+ container.getNodeId().getHost());
+
+ nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+ } catch (Throwable t) {
+ log.error("Could not start TaskManager in container {}.", container.getId(), t);
+
+ // release the failed container
+ resourceManagerClient.releaseAssignedContainer(container.getId());
+ // and ask for a new one
+ requestYarnContainer(container.getResource(), container.getPriority());
+ }
+ } else {
+ // return the excessive containers
+ log.info("Returning excess container {}.", container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
- // and ask for a new one
- requestYarnContainer(container.getResource(), container.getPriority());
}
- } else {
- // return the excessive containers
- log.info("Returning excess container {}.", container.getId());
- resourceManagerClient.releaseAssignedContainer(container.getId());
}
- }
- // if we are waiting for no further containers, we can go to the
- // regular heartbeat interval
- if (numPendingContainerRequests <= 0) {
- resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
- }
+ // if we are waiting for no further containers, we can go to the
+ // regular heartbeat interval
+ if (numPendingContainerRequests <= 0) {
+ resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+ }
+ });
}
@Override
public void onShutdownRequest() {
- try {
- shutDown();
- } catch (Exception e) {
- log.warn("Fail to shutdown the YARN resource manager.", e);
- }
+ runAsync(() -> {
+ try {
+ shutDown();
--- End diff --
`shutDown` can be directly called
---