You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/06/07 05:41:23 UTC

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/6132

    [FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager about failed/killed TaskManagers.

    ## What is the purpose of the change
    
    *Often, the ResourceManager learns faster about TaskManager failures/killings because it directly communicates with the underlying resource management framework. Instead of only relying on the JobManager's heartbeat to figure out that a TaskManager has died, we should additionally send a signal from the ResourceManager to the JobManager if a TaskManager has died. That way, we can react faster to TaskManager failures and recover our running job/s.*
    
    ## Brief change log
    
      - *Add `JobMasterGateway#taskManagerTerminated()` to notify the task manager terminated and do the disconnection there.*
      - *Let the `ResourceManager` to notify JobMaster when the task manager terminated*
    
    ## Verifying this change
    
    - once this approach is verified in general, I will add tests for it.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
    No

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink FLINK-9456

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6132
    
----
commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0
Author: sihuazhou <su...@...>
Date:   2018-05-10T06:36:27Z

    Let ResourceManager notify JobManager about failed/killed TaskManagers.

----


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198910651
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ---
    @@ -53,4 +56,13 @@
     	 * @param cause of the allocation failure
     	 */
     	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
    +
    +	/**
    +	 * Notifies that the task manager has been terminated.
    +	 * @param jobId to be notified
    +	 * @param resourceID identifying the terminated task manager
    +	 * @param allocationIDs of the job held that belong to this task manager
    +	 * @param cause of the task manager termination.
    +	 */
    +	void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set<AllocationID> allocationIDs, Exception cause);
    --- End diff --
    
    I think the notification about a terminated `TaskManager` should not come from the `SlotManager` but from the `ResourceManager`. Thus, we should not need this method.


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6132


---

[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6132
  
    the failure on travis is unrelated.


---

[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6132
  
    cc @tillrohrmann 


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198490029
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
    @@ -45,6 +46,9 @@
     	/** Allocation id for which this slot has been allocated. */
     	private AllocationID allocationId;
     
    +	/** Allocation id for which this slot has been allocated. */
    +	private JobID jobId;
    --- End diff --
    
    Should be annotated with `@Nullable`


---

[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6132
  
    Thanks a lot @sihuazhou. Ping me once you've updated this PR.


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r199315500
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor
     			operatorBackPressureStats.orElse(null)));
     	}
     
    +	@Override
    +	public void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIds, Exception cause) {
    --- End diff --
    
    My previous thought was that `RM` needed to notify the `allocationIds` that was assigned to `JM`, because it was possible that `SlotManager` had already assigned slots to `JM`, but `TM` was killed before `JM` established a connection. Mainly to address the issue in https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach you suggested I think the problem in Flink-9351 has been fixed by the way.


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198491021
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
    @@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
     	 * not available (yet).
     	 */
     	CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
    +
    +	/**
    +	 * Notifies that the task manager has terminated.
    +	 *
    +	 * @param resourceID identifying the task manager
    +	 * @param allocationIDs held by this job that belong to the task manager
    --- End diff --
    
    I think this parameter is not needed.


---

[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6132
  
    @tillrohrmann Thanks for your review and good suggestions, changing the code according to it.


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198912464
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe
     			mainThreadExecutor);
     	}
     
    +	public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) {
    +		final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
    +		if (taskManagerRegistration != null) {
    +			final HashMap<JobID, Set<AllocationID>> jobAndAllocationIDMap = new HashMap<>(4);
    +			for (SlotID slotID : taskManagerRegistration.getSlots()) {
    +				TaskManagerSlot taskManagerSlot = slots.get(slotID);
    +				AllocationID allocationID = taskManagerSlot.getAllocationId();
    +				if (allocationID != null) {
    +					JobID jobId = taskManagerSlot.getJobId();
    +					Set<AllocationID> jobAllocationIDSet = jobAndAllocationIDMap.get(jobId);
    +					if (jobAllocationIDSet == null) {
    +						jobAllocationIDSet = new HashSet<>(2);
    +						jobAndAllocationIDMap.put(jobId, jobAllocationIDSet);
    +					}
    +					jobAllocationIDSet.add(allocationID);
    +				}
    +			}
    +
    +			for (Map.Entry<JobID, Set<AllocationID>> entry : jobAndAllocationIDMap.entrySet()) {
    +				resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause);
    +			}
    +		} else {
    +			LOG.warn("TaskManager failed before registering with slot manager successfully.");
    +		}
    --- End diff --
    
    This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager.
    
    What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`.
    
    By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think?


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198491333
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
    @@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
     	 * not available (yet).
     	 */
     	CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
    +
    +	/**
    +	 * Notifies that the task manager has terminated.
    +	 *
    +	 * @param resourceID identifying the task manager
    +	 * @param allocationIDs held by this job that belong to the task manager
    +	 * @param cause of the task manager termination
    +	 */
    +	void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIDs, Exception cause);
    --- End diff --
    
    methods should usually be a verb. What about `notifyTaskManagerTermination`?


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198491683
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) {
     			return CompletableFuture.completedFuture(null);
     		}
     	}
    +
    +	protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) {
    +		WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
    +		if (workerRegistration != null) {
    +			slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause);
    +		} else {
    +			log.warn("TaskManager failed before registering with ResourceManager successfully.");
    --- End diff --
    
    This should be a debug log message.


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198490490
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor
     			operatorBackPressureStats.orElse(null)));
     	}
     
    +	@Override
    +	public void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIds, Exception cause) {
    --- End diff --
    
    For what do we need the `allocationIds` parameter here?


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198491884
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ---
    @@ -53,4 +56,13 @@
     	 * @param cause of the allocation failure
     	 */
     	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
    +
    +	/**
    +	 * Notifies that the task manager has been terminated.
    --- End diff --
    
    line break is missing here


---

[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6132
  
    Hi @tillrohrmann I updated the PR could you please have a look again?


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198493145
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---
    @@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests notify the job manager when the task manager is failed/killed.
    +	 */
    +	@Test
    +	public void testNotifyTaskManagerFailed() throws Exception {
    +
    +		final List<Tuple4<JobID, ResourceID, Set<AllocationID>, Exception>> notifiedTaskManagerInfos = new ArrayList<>();
    +
    +		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
    +			@Override
    +			public void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set<AllocationID> allocationIDs, Exception cause) {
    +				notifiedTaskManagerInfos.add(new Tuple4<>(jobId, resourceID, allocationIDs, cause));
    +			}
    +		})) {
    --- End diff --
    
    Indentation looks a bit off here


---

[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6132#discussion_r198489897
  
    --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---
    @@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) {
     			startNewWorker(launched.profile());
     		}
     
    -		closeTaskManagerConnection(id, new Exception(status.getMessage()));
    +		final Exception terminatedCause = new Exception(status.getMessage());
    --- End diff --
    
    let's call it `terminationCause`


---