You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shuai-xu <gi...@git.apache.org> on 2017/10/23 10:53:28 UTC

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

GitHub user shuai-xu opened a pull request:

    https://github.com/apache/flink/pull/4887

    [FLINK-7870] [runtime] Cancel slot allocation to RM when requestSlot timed out in SlotPool

    
    ## What is the purpose of the change
    
    This pull request add a cancelSlotRequest rpc protocol between slot pool and resource manager. When the pending request timeout in slot pool, it send a cancelSlotRequest rpc to resouce manager to canel the previous slot request in order not to make the slot request become more and more in resource manager.
    
    ## Verifying this change
    This change added tests and can be verified as follows:
      - *Added a verify in SlotManagerTest to make sure the cancel logic
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shuai-xu/flink jira-xxxx

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4887
    
----
commit daf7fd5745659d94f5a84f669bc90b82b5e69e5e
Author: shuai.xus <sh...@alibaba-inc.com>
Date:   2017-10-17T09:57:18Z

    [FLINK-xxxx] slot pool cancel slot request to resource manager if timeout
    
    Summary: slot pool cancel slot request to resource manager if timeout
    
    Test Plan: unit test
    
    Reviewers: haitao.w
    
    Differential Revision: https://aone.alibaba-inc.com/code/D320749

commit 96f80187bb5ef1c268a62bdaf80151a70a04b002
Author: shuai.xus <sh...@alibaba-inc.com>
Date:   2017-10-19T04:13:01Z

    add more contract

----


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148715689
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ---
    @@ -52,4 +52,6 @@
     	 * @param cause of the allocation failure
     	 */
     	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
    +
    +	void cancelResourceAllocation(ResourceProfile resourceProfile);
    --- End diff --
    
    I comment it is slot manager.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148551966
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) {
     		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
     
     		if (null != pendingSlotRequest) {
    -			cancelPendingSlotRequest(pendingSlotRequest);
    +			if (pendingSlotRequest.isAssigned()) {
    +				cancelPendingSlotRequest(pendingSlotRequest);
    +			}
    +			else {
    +				resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile());
    --- End diff --
    
    I think we should not immediately cancel ongoing resource allocations. The `SlotManager` could decide upon registration of a new worker whether this one is actually needed or not. In the latter case it could release the resource. This would also simplify the protocol since you don't know whether you still can cancel an ongoing resource allocation.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148754465
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) {
     		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
     
     		if (null != pendingSlotRequest) {
    -			cancelPendingSlotRequest(pendingSlotRequest);
    +			if (pendingSlotRequest.isAssigned()) {
    +				cancelPendingSlotRequest(pendingSlotRequest);
    +			}
    +			else {
    +				resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile());
    --- End diff --
    
    Then this should be added as a separate feature because it is not strictly required by this PR here.
    
    Furthermore, I'm not sure whether this shouldn't be the responsibility of the `ResourceManager`. E.g. we could think about adding a timeout for container requests after which we cancel them. Additionally, if we add support for starting machines with multiple slots, then we shouldn't release a requested resource if only a single of it slots is no longer needed. That is something else to consider before adding the cancel resource allocation method.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148716173
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -874,6 +894,13 @@ public void handleError(final Exception exception) {
     	 */
     	public abstract boolean stopWorker(ResourceID resourceID);
     
    +	/**
    +	 * Cancel the allocation of a resource. If the resource allocation has not fulfilled, should cancel it.
    +	 *
    +	 * @param resourceProfile The resource description of the previous allocation
    +	 */
    +	public abstract void cancelNewWorker(ResourceProfile resourceProfile);
    --- End diff --
    
    I comment it is slot manager.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148715651
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) {
     		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
     
     		if (null != pendingSlotRequest) {
    -			cancelPendingSlotRequest(pendingSlotRequest);
    +			if (pendingSlotRequest.isAssigned()) {
    +				cancelPendingSlotRequest(pendingSlotRequest);
    +			}
    +			else {
    +				resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile());
    --- End diff --
    
    Yes, the SlotManager can decide to release the resource more than needed. But in a worst case:
    1. Now the MESOS or YARN cluster have not enough resource.
    2. A job ask for 100 worker of size A;
    3. As there are not enough resource, the job failover, the previous 100 is not cancelled, it ask another 100.
    4. This repeated several times, the pending requests for worker of size A reaches 10000.
    5. A worker of size B crashed, so the job now only need 100 woker of size A and 1 worker of size B. But the YARN or MESOS think the job need 10000 A and 1 B as the request are never cancelled.
    6. The MESOS/YARN now have some resources for 110 A, more than 100 A and 1 B, and it begin to assign resource for the job, but it first try to allocate 10000 containers of size A, and the job still can not be started as it is lack of container B. 
    7. This may cause the job can not be started even when the cluster resource is now enough in a long time.
    8. And this did happen in our cluster, as our cluster is busy. So I think it's better to keep this protocol, and different resource managers can treat this protocol according to their need.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148552085
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ---
    @@ -52,4 +52,6 @@
     	 * @param cause of the allocation failure
     	 */
     	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
    +
    +	void cancelResourceAllocation(ResourceProfile resourceProfile);
    --- End diff --
    
    I think we don't necessarily need this call. See my comment in the `SlotManager`.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4887


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148553179
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -399,6 +399,26 @@ public void disconnectJobManager(final JobID jobId, final Exception cause) {
     	}
     
     	@Override
    +	public void cancelSlotRequest(JobID jobID, JobMasterId jobMasterId, AllocationID allocationID) {
    +
    +		// As the slot allocations are async, it can not avoid all redundent slots, but should best effort.
    +		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobID);
    +
    +		if (null != jobManagerRegistration) {
    +			if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
    +				log.info("Cancel slot request for job {} with allocation id {}.",
    +						jobID, allocationID);
    +
    +				slotManager.unregisterSlotRequest(allocationID);
    +			} else {
    +				log.info("Job manager {} is not the leader of job {}.", jobMasterId, jobID);
    +			}
    +		} else {
    +			log.warn("Could not find registered job manager for job {}.", jobID);
    +		}
    --- End diff --
    
    This could be simplified to `slotManager.unregisterSlotRequest(allocationId)` if we change `cancelSlotRequest(JobID, JobMasterId, AllocationID)` to `cancelSlotRequest(AllocationID)`.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148555569
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
    @@ -363,6 +363,9 @@ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
     	private void checkTimeoutSlotAllocation(AllocationID allocationID) {
     		PendingRequest request = pendingRequests.remove(allocationID);
     		if (request != null && !request.getFuture().isDone()) {
    +			if (resourceManagerGateway != null) {
    +				resourceManagerGateway.cancelSlotRequest(jobId, jobMasterId, allocationID);
    --- End diff --
    
    I think we could actually add this call as exceptional callback which is triggered by the `TimeoutException`. In `requestSlotFromResourceManager` we could add something like `future.whenComplete((value, throwable) -> { if (throwable instanceOf TimeoutException) { resourceManagerGateway.cancelSlotRequest(); }});`


---

[GitHub] flink issue #4887: [FLINK-7870] [runtime] Cancel slot allocation to RM when ...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/4887
  
    @tillrohrmann could you help to review this pr? Thank you.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148552792
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -874,6 +894,13 @@ public void handleError(final Exception exception) {
     	 */
     	public abstract boolean stopWorker(ResourceID resourceID);
     
    +	/**
    +	 * Cancel the allocation of a resource. If the resource allocation has not fulfilled, should cancel it.
    +	 *
    +	 * @param resourceProfile The resource description of the previous allocation
    +	 */
    +	public abstract void cancelNewWorker(ResourceProfile resourceProfile);
    --- End diff --
    
    See my comment in the `SlotManager`. I think we don't need this call here. Rather, I would like the SlotManager to decide upon registration of a new resource whether it needs it or not and release the resource via calling `ResourceManagerActions#releaseResource`.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148754660
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
    @@ -316,6 +316,13 @@ private void requestSlotFromResourceManager(
     
     		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
     
    +		future.whenComplete(
    +			(value, throwable) -> {
    +				if (throwable != null && throwable instanceof TimeoutException) {
    --- End diff --
    
    I think we should `cancelSlotRequest` for all `Throwables`.


---

[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4887#discussion_r148550603
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---
    @@ -71,6 +71,18 @@
     		@RpcTimeout Time timeout);
     
     	/**
    +	 * Cancel the slot allocation requests from the resource manager.
    +	 *
    +	 * @param jobID JobID for which the JobManager was the leader
    +	 * @param jobMasterId id of the JobMaster
    +	 * @param allocationID The slot to request
    +	 */
    +	void cancelSlotRequest(
    +		JobID jobID,
    +		JobMasterId jobMasterId,
    +		AllocationID allocationID);
    --- End diff --
    
    I think this could be `void cancelSlotRequest(AllocationID allocationId)`. Not sure why we need the other information.


---