You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/06/07 05:41:23 UTC
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/6132
[FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager about failed/killed TaskManagers.
## What is the purpose of the change
*Often, the ResourceManager learns faster about TaskManager failures/killings because it directly communicates with the underlying resource management framework. Instead of only relying on the JobManager's heartbeat to figure out that a TaskManager has died, we should additionally send a signal from the ResourceManager to the JobManager if a TaskManager has died. That way, we can react faster to TaskManager failures and recover our running job/s.*
## Brief change log
- *Add `JobMasterGateway#taskManagerTerminated()` to notify the task manager terminated and do the disconnection there.*
- *Let the `ResourceManager` to notify JobMaster when the task manager terminated*
## Verifying this change
- once this approach is verified in general, I will add tests for it.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
No
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink FLINK-9456
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6132.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6132
----
commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0
Author: sihuazhou <su...@...>
Date: 2018-05-10T06:36:27Z
Let ResourceManager notify JobManager about failed/killed TaskManagers.
----
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198910651
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ---
@@ -53,4 +56,13 @@
* @param cause of the allocation failure
*/
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
+
+ /**
+ * Notifies that the task manager has been terminated.
+ * @param jobId to be notified
+ * @param resourceID identifying the terminated task manager
+ * @param allocationIDs of the job held that belong to this task manager
+ * @param cause of the task manager termination.
+ */
+ void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set<AllocationID> allocationIDs, Exception cause);
--- End diff --
I think the notification about a terminated `TaskManager` should not come from the `SlotManager` but from the `ResourceManager`. Thus, we should not need this method.
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6132
---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6132
the failure on travis is unrelated.
---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6132
cc @tillrohrmann
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198490029
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---
@@ -45,6 +46,9 @@
/** Allocation id for which this slot has been allocated. */
private AllocationID allocationId;
+ /** Allocation id for which this slot has been allocated. */
+ private JobID jobId;
--- End diff --
Should be annotated with `@Nullable`
---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/6132
Thanks a lot @sihuazhou. Ping me once you've updated this PR.
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r199315500
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
+ @Override
+ public void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIds, Exception cause) {
--- End diff --
My previous thought was that `RM` needed to notify the `allocationIds` that was assigned to `JM`, because it was possible that `SlotManager` had already assigned slots to `JM`, but `TM` was killed before `JM` established a connection. Mainly to address the issue in https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach you suggested I think the problem in Flink-9351 has been fixed by the way.
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198491021
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
* not available (yet).
*/
CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+ /**
+ * Notifies that the task manager has terminated.
+ *
+ * @param resourceID identifying the task manager
+ * @param allocationIDs held by this job that belong to the task manager
--- End diff --
I think this parameter is not needed.
---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6132
@tillrohrmann Thanks for your review and good suggestions, changing the code according to it.
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198912464
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
@@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe
mainThreadExecutor);
}
+ public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) {
+ final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
+ if (taskManagerRegistration != null) {
+ final HashMap<JobID, Set<AllocationID>> jobAndAllocationIDMap = new HashMap<>(4);
+ for (SlotID slotID : taskManagerRegistration.getSlots()) {
+ TaskManagerSlot taskManagerSlot = slots.get(slotID);
+ AllocationID allocationID = taskManagerSlot.getAllocationId();
+ if (allocationID != null) {
+ JobID jobId = taskManagerSlot.getJobId();
+ Set<AllocationID> jobAllocationIDSet = jobAndAllocationIDMap.get(jobId);
+ if (jobAllocationIDSet == null) {
+ jobAllocationIDSet = new HashSet<>(2);
+ jobAndAllocationIDMap.put(jobId, jobAllocationIDSet);
+ }
+ jobAllocationIDSet.add(allocationID);
+ }
+ }
+
+ for (Map.Entry<JobID, Set<AllocationID>> entry : jobAndAllocationIDMap.entrySet()) {
+ resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause);
+ }
+ } else {
+ LOG.warn("TaskManager failed before registering with slot manager successfully.");
+ }
--- End diff --
This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager.
What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`.
By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think?
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198491333
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
* not available (yet).
*/
CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+ /**
+ * Notifies that the task manager has terminated.
+ *
+ * @param resourceID identifying the task manager
+ * @param allocationIDs held by this job that belong to the task manager
+ * @param cause of the task manager termination
+ */
+ void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIDs, Exception cause);
--- End diff --
methods should usually be a verb. What about `notifyTaskManagerTermination`?
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198491683
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
@@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+ protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) {
+ WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+ if (workerRegistration != null) {
+ slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause);
+ } else {
+ log.warn("TaskManager failed before registering with ResourceManager successfully.");
--- End diff --
This should be a debug log message.
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198490490
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
+ @Override
+ public void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIds, Exception cause) {
--- End diff --
For what do we need the `allocationIds` parameter here?
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198491884
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ---
@@ -53,4 +56,13 @@
* @param cause of the allocation failure
*/
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
+
+ /**
+ * Notifies that the task manager has been terminated.
--- End diff --
line break is missing here
---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6132
Hi @tillrohrmann I updated the PR could you please have a look again?
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198493145
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---
@@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws Exception {
}
}
+ /**
+ * Tests notify the job manager when the task manager is failed/killed.
+ */
+ @Test
+ public void testNotifyTaskManagerFailed() throws Exception {
+
+ final List<Tuple4<JobID, ResourceID, Set<AllocationID>, Exception>> notifiedTaskManagerInfos = new ArrayList<>();
+
+ try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+ @Override
+ public void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set<AllocationID> allocationIDs, Exception cause) {
+ notifiedTaskManagerInfos.add(new Tuple4<>(jobId, resourceID, allocationIDs, cause));
+ }
+ })) {
--- End diff --
Indentation looks a bit off here
---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6132#discussion_r198489897
--- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---
@@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) {
startNewWorker(launched.profile());
}
- closeTaskManagerConnection(id, new Exception(status.getMessage()));
+ final Exception terminatedCause = new Exception(status.getMessage());
--- End diff --
let's call it `terminationCause`
---