You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Clarkkkkk <gi...@git.apache.org> on 2018/06/21 08:20:09 UTC

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

GitHub user Clarkkkkk opened a pull request:

    https://github.com/apache/flink/pull/6192

    [FLINK-9567][runtime][yarn] Fix the bug that Flink does not release Yarn container in some cases 

    ## What is the purpose of the change
    
      - This pull request responds to  [JIRA issue FLINK-9567](https://issues.apache.org/jira/browse/FLINK-9567). 
      
      - This pull request is to avoid flink system does not release taskmanager container in some specific case.
    
    ## Bried change log
    
      - Modify the onContainerCompleted method in YarnResourceManager.
    
      - Add a checkWorkerRegistrationWithResourceId method in ResourceManager that check the status of task executor with specific resourceId
    
      - Add a triggerTaskManagerHeartbeatTimeout method in ResouceManager which is only visibleForTesting, which emulate the timeout of task manager leads to the problem.
    
    ## Verifying this change
    This  change is covered by testOnContainerCompleted added in YarnResourceManagerTest
    
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (no)
    
      - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
    
      - The serializers: (no)
    
      - The runtime per-record code paths (performance sensitive): (no)
    
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
      - The S3 file system connector: (no)
    
    ## Documentation
      - Does this pull request introduce a new feature? (No)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Clarkkkkk/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6192.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6192
    
----
commit 6eaac2b5bc06ef6dffef5eb489668b0d81ac179b
Author: yangshimin <ya...@...>
Date:   2018-06-21T03:36:01Z

    [FLINK-9567][runtime][yarn]Fix the bug that Flink does not release Yarn container in some cases

----


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199035271
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
    @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception {
     			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
     		}};
     	}
    +
    +	@Test
    +	public void testOnContainerCompleted() throws Exception {
    +		new Context() {{
    +			startResourceManager();
    +			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
    +				rmServices.slotManager.registerSlotRequest(
    +					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
    +				return null;
    +			});
    +			// wait for the registerSlotRequest completion
    +			registerSlotRequestFuture.get();
    +			// Callback from YARN when container is allocated.
    +			Container testingContainer = mock(Container.class);
    +			when(testingContainer.getId()).thenReturn(
    +				ContainerId.newInstance(
    +					ApplicationAttemptId.newInstance(
    +						ApplicationId.newInstance(System.currentTimeMillis(), 1),
    +						1),
    +					1));
    +			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
    +			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
    +			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
    +
    +			ImmutableList<Container> testingContainerList = ImmutableList.of(testingContainer);
    +			resourceManager.onContainersAllocated(testingContainerList);
    +			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
    +			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
    +
    +			// Remote task executor registers with YarnResourceManager.
    +			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
    --- End diff --
    
    Sure, I will modify it later.


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r198914433
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, Void payload) {
     			return CompletableFuture.completedFuture(null);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Work Registration status checking
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Check if the executor with given resourceID is still in taskExecutors map
    +	 * @param resourceID an ID mapping to a task executor
    +	 * @return
    +	 */
    +	protected boolean checkWorkerRegistrationWithResourceId(ResourceID resourceID) {
    +		boolean status = taskExecutors.containsKey(resourceID);
    +		if (!status) {
    +			log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID);
    +		}
    +		return status;
    +	}
    +
    +	@VisibleForTesting
    +	public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) {
    --- End diff --
    
    Let's not add this method which is only used for testing purposes to the production code. Instead, I would suggest to subclass `ResourceManager` in your test and add this method.


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk closed the pull request at:

    https://github.com/apache/flink/pull/6192


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199168161
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -334,8 +335,11 @@ public void onContainersCompleted(final List<ContainerStatus> list) {
     					if (yarnWorkerNode != null) {
     						// Container completed unexpectedly ~> start a new one
     						final Container container = yarnWorkerNode.getContainer();
    -						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
    -						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
    +						// check WorkerRegistration status to avoid requesting containers more than required
    +						if (checkWorkerRegistrationWithResourceId(resourceId)) {
    --- End diff --
    
    What we maybe could do instead of counting simply how many pending container requests we have is to ask the `SlotManager` how many pending slot allocations it has. If the number of slot allocations is lower than the product of slots per task manager x pending container requests, then we would not have to restart the container.


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199190175
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -334,8 +335,11 @@ public void onContainersCompleted(final List<ContainerStatus> list) {
     					if (yarnWorkerNode != null) {
     						// Container completed unexpectedly ~> start a new one
     						final Container container = yarnWorkerNode.getContainer();
    -						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
    -						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
    +						// check WorkerRegistration status to avoid requesting containers more than required
    +						if (checkWorkerRegistrationWithResourceId(resourceId)) {
    --- End diff --
    
    I will work on the implementation of the idea you mentioned above which makes perfect sense to me.


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199189703
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -334,8 +335,11 @@ public void onContainersCompleted(final List<ContainerStatus> list) {
     					if (yarnWorkerNode != null) {
     						// Container completed unexpectedly ~> start a new one
     						final Container container = yarnWorkerNode.getContainer();
    -						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
    -						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
    +						// check WorkerRegistration status to avoid requesting containers more than required
    +						if (checkWorkerRegistrationWithResourceId(resourceId)) {
    --- End diff --
    
    Yes, you are right. I have been thinking about how to add a global version state to a container all the week, but feels like I came into a dead end. I found this question when there are higher priority batch job in yarn cluster which occupied all the resources and made yarn kill the flink task manager containers frequently in a period of time. 


---

[GitHub] flink issue #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink does n...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on the issue:

    https://github.com/apache/flink/pull/6192
  
    @tillrohrmann I think this problem is caused by inconsistency between instance variables taskExecutor of ResourceManager class and workerNodeMap of YarnResourceManager class. What do you think about my fix?


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199166104
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -334,8 +335,11 @@ public void onContainersCompleted(final List<ContainerStatus> list) {
     					if (yarnWorkerNode != null) {
     						// Container completed unexpectedly ~> start a new one
     						final Container container = yarnWorkerNode.getContainer();
    -						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
    -						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
    +						// check WorkerRegistration status to avoid requesting containers more than required
    +						if (checkWorkerRegistrationWithResourceId(resourceId)) {
    --- End diff --
    
    Yes, I think it is not possible to distinguish between a container which was released but has not been completed before a recovery and a container failure just after recovery without some kind of state (in both cases we retrieve the containers from the previous attempt and get a onContainerCompleted signal from the container).
    
    My question would be how often does it happen that we run into this situation. Releasing a container and failing immediately afterwards should happen fairly rarely, I would assume. Moreover, at some point the an idle `TaskManager` should be released and, thus, also the underlying container.


---

[GitHub] flink issue #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink does n...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on the issue:

    https://github.com/apache/flink/pull/6192
  
    @tillrohrmann But I think this only works if the restart strategy is RestartAllStrategy. 


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199036840
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -334,8 +335,11 @@ public void onContainersCompleted(final List<ContainerStatus> list) {
     					if (yarnWorkerNode != null) {
     						// Container completed unexpectedly ~> start a new one
     						final Container container = yarnWorkerNode.getContainer();
    -						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
    -						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
    +						// check WorkerRegistration status to avoid requesting containers more than required
    +						if (checkWorkerRegistrationWithResourceId(resourceId)) {
    --- End diff --
    
    Yes, I might happen. The problem is not as easy as I thought. The actual cause of this problem is the resource was released before a full restart but the onContainerCompleted callback method happened after the full restart. As the full restart will requesting all the containers needed as configured, if the onContainerCompleted  method was called after that, it will request for a new container and possess it which is not needed.


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r198917467
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -334,8 +335,11 @@ public void onContainersCompleted(final List<ContainerStatus> list) {
     					if (yarnWorkerNode != null) {
     						// Container completed unexpectedly ~> start a new one
     						final Container container = yarnWorkerNode.getContainer();
    -						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
    -						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
    +						// check WorkerRegistration status to avoid requesting containers more than required
    +						if (checkWorkerRegistrationWithResourceId(resourceId)) {
    --- End diff --
    
    Wouldn't that prevent container restarts if the container failure happened before the `TaskManager` registered at the `ResourceManager`, because then, `ResourceManager#taskExecutors` would not contain the given `ResourceID`?


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by Clarkkkkk <gi...@git.apache.org>.
Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199035236
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, Void payload) {
     			return CompletableFuture.completedFuture(null);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Work Registration status checking
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Check if the executor with given resourceID is still in taskExecutors map
    +	 * @param resourceID an ID mapping to a task executor
    +	 * @return
    +	 */
    +	protected boolean checkWorkerRegistrationWithResourceId(ResourceID resourceID) {
    +		boolean status = taskExecutors.containsKey(resourceID);
    +		if (!status) {
    +			log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID);
    +		}
    +		return status;
    +	}
    +
    +	@VisibleForTesting
    +	public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) {
    --- End diff --
    
    OK.


---

[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r198915014
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
    @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception {
     			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
     		}};
     	}
    +
    +	@Test
    +	public void testOnContainerCompleted() throws Exception {
    +		new Context() {{
    +			startResourceManager();
    +			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
    +				rmServices.slotManager.registerSlotRequest(
    +					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
    +				return null;
    +			});
    +			// wait for the registerSlotRequest completion
    +			registerSlotRequestFuture.get();
    +			// Callback from YARN when container is allocated.
    +			Container testingContainer = mock(Container.class);
    +			when(testingContainer.getId()).thenReturn(
    +				ContainerId.newInstance(
    +					ApplicationAttemptId.newInstance(
    +						ApplicationId.newInstance(System.currentTimeMillis(), 1),
    +						1),
    +					1));
    +			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
    +			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
    +			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
    +
    +			ImmutableList<Container> testingContainerList = ImmutableList.of(testingContainer);
    +			resourceManager.onContainersAllocated(testingContainerList);
    +			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
    +			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
    +
    +			// Remote task executor registers with YarnResourceManager.
    +			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
    --- End diff --
    
    Can we use the `TestingTaskExecutorGateway` here?


---