You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GJL <gi...@git.apache.org> on 2018/03/09 13:37:09 UTC

[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...

GitHub user GJL opened a pull request:

    https://github.com/apache/flink/pull/5675

    [FLINK-7804][flip6] YarnResourceManager does not execute AMRMClientAsync callbacks in main thread

    ## What is the purpose of the change
    
    *The `YarnResourceManager` registers callbacks at a `AMRMClientAsync` which it uses to react to Yarn container allocations. These callbacks, e.g., `onContainersAllocated`, modify the internal state of the YarnResourceManager. This can lead to race conditions with the `requestYarnContainer` method. To solve this problem we have to execute the state changing operations in the main thread of the `YarnResourceManager`.*
    
    
    ## Brief change log
      - *Run AMRMClientAsync callbacks in main thread*
      - *Fix `YarnResourceManagerTest`*
    
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `YarnResourceManagerTest`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GJL/flink FLINK-7804

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5675
    
----
commit f6c61e0c7989d6d67f784818aa94cd112f600bff
Author: gyao <ga...@...>
Date:   2018-03-09T13:36:33Z

    [FLINK-7804][flip6] Run AMRMClientAsync callbacks in main thread

----


---

[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5675#discussion_r175020997
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -325,67 +328,74 @@ public float getProgress() {
     
     	@Override
     	public void onContainersCompleted(List<ContainerStatus> list) {
    -		for (ContainerStatus container : list) {
    -			if (container.getExitStatus() < 0) {
    -				closeTaskManagerConnection(new ResourceID(
    -					container.getContainerId().toString()), new Exception(container.getDiagnostics()));
    +		runAsync(() -> {
    +				for (ContainerStatus container : list) {
    +					if (container.getExitStatus() < 0) {
    +						closeTaskManagerConnection(new ResourceID(
    +							container.getContainerId().toString()), new Exception(container.getDiagnostics()));
    +					}
    +					workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
    +				}
     			}
    -			workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
    -		}
    +		);
     	}
     
     	@Override
     	public void onContainersAllocated(List<Container> containers) {
    -		for (Container container : containers) {
    -			log.info(
    -				"Received new container: {} - Remaining pending container requests: {}",
    -				container.getId(),
    -				numPendingContainerRequests);
    -
    -			if (numPendingContainerRequests > 0) {
    -				numPendingContainerRequests--;
    -
    -				final String containerIdStr = container.getId().toString();
    -
    -				workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
    -
    -				try {
    -					// Context information used to start a TaskExecutor Java process
    -					ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    -						container.getResource(),
    -						containerIdStr,
    -						container.getNodeId().getHost());
    -
    -					nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    -				} catch (Throwable t) {
    -					log.error("Could not start TaskManager in container {}.", container.getId(), t);
    -
    -					// release the failed container
    +		runAsync(() -> {
    +			for (Container container : containers) {
    +				log.info(
    +					"Received new container: {} - Remaining pending container requests: {}",
    +					container.getId(),
    +					numPendingContainerRequests);
    +
    +				if (numPendingContainerRequests > 0) {
    +					numPendingContainerRequests--;
    +
    +					final String containerIdStr = container.getId().toString();
    +
    +					workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
    +
    +					try {
    +						// Context information used to start a TaskExecutor Java process
    +						ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    +							container.getResource(),
    +							containerIdStr,
    +							container.getNodeId().getHost());
    +
    +						nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    +					} catch (Throwable t) {
    +						log.error("Could not start TaskManager in container {}.", container.getId(), t);
    +
    +						// release the failed container
    +						resourceManagerClient.releaseAssignedContainer(container.getId());
    +						// and ask for a new one
    +						requestYarnContainer(container.getResource(), container.getPriority());
    +					}
    +				} else {
    +					// return the excessive containers
    +					log.info("Returning excess container {}.", container.getId());
     					resourceManagerClient.releaseAssignedContainer(container.getId());
    -					// and ask for a new one
    -					requestYarnContainer(container.getResource(), container.getPriority());
     				}
    -			} else {
    -				// return the excessive containers
    -				log.info("Returning excess container {}.", container.getId());
    -				resourceManagerClient.releaseAssignedContainer(container.getId());
     			}
    -		}
     
    -		// if we are waiting for no further containers, we can go to the
    -		// regular heartbeat interval
    -		if (numPendingContainerRequests <= 0) {
    -			resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    -		}
    +			// if we are waiting for no further containers, we can go to the
    +			// regular heartbeat interval
    +			if (numPendingContainerRequests <= 0) {
    +				resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    +			}
    +		});
     	}
     
     	@Override
     	public void onShutdownRequest() {
    -		try {
    -			shutDown();
    -		} catch (Exception e) {
    -			log.warn("Fail to shutdown the YARN resource manager.", e);
    -		}
    +		runAsync(() -> {
    +			try {
    +				shutDown();
    --- End diff --
    
    done


---

[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5675#discussion_r174834772
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -395,10 +405,13 @@ public void onNodesUpdated(List<NodeReport> list) {
     
     	@Override
     	public void onError(Throwable error) {
    -		onFatalError(error);
    +		runAsync(() -> onFatalError(error));
    --- End diff --
    
    I think it would be good to let the error propagate directly. In case of an OOM exception we want to quickly shut down the JVM.


---

[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5675#discussion_r175020983
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -395,10 +405,13 @@ public void onNodesUpdated(List<NodeReport> list) {
     
     	@Override
     	public void onError(Throwable error) {
    -		onFatalError(error);
    +		runAsync(() -> onFatalError(error));
    --- End diff --
    
    done


---

[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5675


---

[GitHub] flink pull request #5675: [FLINK-7804][flip6] YarnResourceManager does not e...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5675#discussion_r174834515
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -325,67 +328,74 @@ public float getProgress() {
     
     	@Override
     	public void onContainersCompleted(List<ContainerStatus> list) {
    -		for (ContainerStatus container : list) {
    -			if (container.getExitStatus() < 0) {
    -				closeTaskManagerConnection(new ResourceID(
    -					container.getContainerId().toString()), new Exception(container.getDiagnostics()));
    +		runAsync(() -> {
    +				for (ContainerStatus container : list) {
    +					if (container.getExitStatus() < 0) {
    +						closeTaskManagerConnection(new ResourceID(
    +							container.getContainerId().toString()), new Exception(container.getDiagnostics()));
    +					}
    +					workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
    +				}
     			}
    -			workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
    -		}
    +		);
     	}
     
     	@Override
     	public void onContainersAllocated(List<Container> containers) {
    -		for (Container container : containers) {
    -			log.info(
    -				"Received new container: {} - Remaining pending container requests: {}",
    -				container.getId(),
    -				numPendingContainerRequests);
    -
    -			if (numPendingContainerRequests > 0) {
    -				numPendingContainerRequests--;
    -
    -				final String containerIdStr = container.getId().toString();
    -
    -				workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
    -
    -				try {
    -					// Context information used to start a TaskExecutor Java process
    -					ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    -						container.getResource(),
    -						containerIdStr,
    -						container.getNodeId().getHost());
    -
    -					nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    -				} catch (Throwable t) {
    -					log.error("Could not start TaskManager in container {}.", container.getId(), t);
    -
    -					// release the failed container
    +		runAsync(() -> {
    +			for (Container container : containers) {
    +				log.info(
    +					"Received new container: {} - Remaining pending container requests: {}",
    +					container.getId(),
    +					numPendingContainerRequests);
    +
    +				if (numPendingContainerRequests > 0) {
    +					numPendingContainerRequests--;
    +
    +					final String containerIdStr = container.getId().toString();
    +
    +					workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container));
    +
    +					try {
    +						// Context information used to start a TaskExecutor Java process
    +						ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    +							container.getResource(),
    +							containerIdStr,
    +							container.getNodeId().getHost());
    +
    +						nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    +					} catch (Throwable t) {
    +						log.error("Could not start TaskManager in container {}.", container.getId(), t);
    +
    +						// release the failed container
    +						resourceManagerClient.releaseAssignedContainer(container.getId());
    +						// and ask for a new one
    +						requestYarnContainer(container.getResource(), container.getPriority());
    +					}
    +				} else {
    +					// return the excessive containers
    +					log.info("Returning excess container {}.", container.getId());
     					resourceManagerClient.releaseAssignedContainer(container.getId());
    -					// and ask for a new one
    -					requestYarnContainer(container.getResource(), container.getPriority());
     				}
    -			} else {
    -				// return the excessive containers
    -				log.info("Returning excess container {}.", container.getId());
    -				resourceManagerClient.releaseAssignedContainer(container.getId());
     			}
    -		}
     
    -		// if we are waiting for no further containers, we can go to the
    -		// regular heartbeat interval
    -		if (numPendingContainerRequests <= 0) {
    -			resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    -		}
    +			// if we are waiting for no further containers, we can go to the
    +			// regular heartbeat interval
    +			if (numPendingContainerRequests <= 0) {
    +				resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    +			}
    +		});
     	}
     
     	@Override
     	public void onShutdownRequest() {
    -		try {
    -			shutDown();
    -		} catch (Exception e) {
    -			log.warn("Fail to shutdown the YARN resource manager.", e);
    -		}
    +		runAsync(() -> {
    +			try {
    +				shutDown();
    --- End diff --
    
    `shutDown` can be directly called


---