You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/01 22:19:04 UTC

[2/2] flink git commit: [FLINK-9456] Let ResourceManager notify JobManager about failed/killed TaskManagers.

[FLINK-9456] Let ResourceManager notify JobManager about failed/killed TaskManagers.

This closes #6132.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50c0ea8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50c0ea8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50c0ea8c

Branch: refs/heads/master
Commit: 50c0ea8c9fe17278d45aba476a95791152a1420b
Parents: 432e48a
Author: sihuazhou <su...@163.com>
Authored: Thu May 10 14:36:27 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jul 1 21:10:04 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/TaskManagerSlot.java |  19 +++-
 .../flink/runtime/jobmaster/JobMaster.java      |   5 +
 .../runtime/jobmaster/JobMasterGateway.java     |   8 ++
 .../runtime/jobmaster/slotpool/SlotPool.java    |   2 +-
 .../resourcemanager/ResourceManager.java        |   5 +
 .../slotmanager/SlotManager.java                |  45 +++++---
 .../runtime/taskexecutor/TaskExecutor.java      |   3 +-
 .../exceptions/SlotOccupiedException.java       |  16 ++-
 .../utils/TestingJobMasterGateway.java          |   5 +
 .../slotmanager/SlotManagerTest.java            | 103 ++++++++++++++++++-
 10 files changed, 186 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index fb7fce3..be39424 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,6 +48,10 @@ public class TaskManagerSlot {
 	/** Allocation id for which this slot has been allocated. */
 	private AllocationID allocationId;
 
+	/** Allocation id for which this slot has been allocated. */
+	@Nullable
+	private JobID jobId;
+
 	/** Assigned slot request if there is currently an ongoing request. */
 	private PendingSlotRequest assignedSlotRequest;
 
@@ -83,6 +90,10 @@ public class TaskManagerSlot {
 		return allocationId;
 	}
 
+	public JobID getJobId() {
+		return jobId;
+	}
+
 	public PendingSlotRequest getAssignedSlotRequest() {
 		return assignedSlotRequest;
 	}
@@ -96,6 +107,7 @@ public class TaskManagerSlot {
 
 		state = State.FREE;
 		allocationId = null;
+		jobId = null;
 	}
 
 	public void clearPendingSlotRequest() {
@@ -112,21 +124,24 @@ public class TaskManagerSlot {
 		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
 	}
 
-	public void completeAllocation(AllocationID allocationId) {
+	public void completeAllocation(AllocationID allocationId, JobID jobId) {
 		Preconditions.checkNotNull(allocationId, "Allocation id must not be null.");
+		Preconditions.checkNotNull(jobId, "Job id must not be null.");
 		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
 		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
 
 		state = State.ALLOCATED;
 		this.allocationId = allocationId;
+		this.jobId = jobId;
 		assignedSlotRequest = null;
 	}
 
-	public void updateAllocation(AllocationID allocationId) {
+	public void updateAllocation(AllocationID allocationId, JobID jobId) {
 		Preconditions.checkState(state == State.FREE, "The slot has to be free in order to set an allocation id.");
 
 		state = State.ALLOCATED;
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = Preconditions.checkNotNull(jobId);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e4a1b6a..7557bc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -980,6 +980,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			operatorBackPressureStats.orElse(null)));
 	}
 
+	@Override
+	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
+		slotPool.failAllocation(allocationID, cause);
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4ea9357..981222d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -278,4 +278,12 @@ public interface JobMasterGateway extends
 	 * not available (yet).
 	 */
 	CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+	/**
+	 * Notifies that the allocation has failed.
+	 *
+	 * @param allocationID the failed allocation id.
+	 * @param cause the reason that the allocation failed
+	 */
+	void notifyAllocationFailure(AllocationID allocationID, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 81b3e24..27440a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				log.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+				log.trace("Outdated request to fail slot [{}] with ", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6e5c824..3ea5c2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1013,6 +1013,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
 			validateRunsInMainThread();
 			log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
+
+			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
+			if (jobManagerRegistration != null) {
+				jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index fe503b2..d0d03f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -343,6 +344,7 @@ public class SlotManager implements AutoCloseable {
 				registerSlot(
 					slotStatus.getSlotID(),
 					slotStatus.getAllocationID(),
+					slotStatus.getJobID(),
 					slotStatus.getResourceProfile(),
 					taskExecutorConnection);
 			}
@@ -392,7 +394,7 @@ public class SlotManager implements AutoCloseable {
 		if (null != taskManagerRegistration) {
 
 			for (SlotStatus slotStatus : slotReport) {
-				updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
+				updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID());
 			}
 
 			return true;
@@ -426,7 +428,7 @@ public class SlotManager implements AutoCloseable {
 							slot.getInstanceId() + " which has not been registered.");
 					}
 
-					updateSlotState(slot, taskManagerRegistration, null);
+					updateSlotState(slot, taskManagerRegistration, null, null);
 				} else {
 					LOG.debug("Received request to free slot {} with expected allocation id {}, " +
 						"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -515,6 +517,7 @@ public class SlotManager implements AutoCloseable {
 	private void registerSlot(
 			SlotID slotId,
 			AllocationID allocationId,
+			JobID jobId,
 			ResourceProfile resourceProfile,
 			TaskExecutorConnection taskManagerConnection) {
 
@@ -530,7 +533,7 @@ public class SlotManager implements AutoCloseable {
 
 		slots.put(slotId, slot);
 
-		updateSlot(slotId, allocationId);
+		updateSlot(slotId, allocationId, jobId);
 	}
 
 	/**
@@ -540,14 +543,14 @@ public class SlotManager implements AutoCloseable {
 	 * @param allocationId specifying the current allocation of the slot
 	 * @return True if the slot could be updated; otherwise false
 	 */
-	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
+	private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
 		final TaskManagerSlot slot = slots.get(slotId);
 
 		if (slot != null) {
 			final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
 			if (taskManagerRegistration != null) {
-				updateSlotState(slot, taskManagerRegistration, allocationId);
+				updateSlotState(slot, taskManagerRegistration, allocationId, jobId);
 
 				return true;
 			} else {
@@ -561,7 +564,11 @@ public class SlotManager implements AutoCloseable {
 		}
 	}
 
-	private void updateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
+	private void updateSlotState(
+		TaskManagerSlot slot,
+		TaskManagerRegistration taskManagerRegistration,
+		@Nullable AllocationID allocationId,
+		@Nullable JobID jobId) {
 		if (null != allocationId) {
 			switch (slot.getState()) {
 				case PENDING:
@@ -575,12 +582,12 @@ public class SlotManager implements AutoCloseable {
 						// remove the pending slot request, since it has been completed
 						pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
 
-						slot.completeAllocation(allocationId);
+						slot.completeAllocation(allocationId, jobId);
 					} else {
 						// we first have to free the slot in order to set a new allocationId
 						slot.clearPendingSlotRequest();
 						// set the allocation id such that the slot won't be considered for the pending slot request
-						slot.updateAllocation(allocationId);
+						slot.updateAllocation(allocationId, jobId);
 
 						// this will try to find a new slot for the request
 						rejectPendingSlotRequest(
@@ -593,13 +600,13 @@ public class SlotManager implements AutoCloseable {
 				case ALLOCATED:
 					if (!Objects.equals(allocationId, slot.getAllocationId())) {
 						slot.freeSlot();
-						slot.updateAllocation(allocationId);
+						slot.updateAllocation(allocationId, jobId);
 					}
 					break;
 				case FREE:
 					// the slot is currently free --> it is stored in freeSlots
 					freeSlots.remove(slot.getSlotId());
-					slot.updateAllocation(allocationId);
+					slot.updateAllocation(allocationId, jobId);
 					taskManagerRegistration.occupySlot();
 					break;
 			}
@@ -660,15 +667,16 @@ public class SlotManager implements AutoCloseable {
 		final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
 		final AllocationID allocationId = pendingSlotRequest.getAllocationId();
 		final SlotID slotId = taskManagerSlot.getSlotId();
+		final InstanceID instanceID = taskManagerSlot.getInstanceId();
 
 		taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
 		pendingSlotRequest.setRequestFuture(completableFuture);
 
-		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
 
 		if (taskManagerRegistration == null) {
 			throw new IllegalStateException("Could not find a registered task manager for instance id " +
-				taskManagerSlot.getInstanceId() + '.');
+				instanceID + '.');
 		}
 
 		taskManagerRegistration.markUsed();
@@ -695,11 +703,11 @@ public class SlotManager implements AutoCloseable {
 			(Acknowledge acknowledge, Throwable throwable) -> {
 				try {
 					if (acknowledge != null) {
-						updateSlot(slotId, allocationId);
+						updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
 					} else {
 						if (throwable instanceof SlotOccupiedException) {
 							SlotOccupiedException exception = (SlotOccupiedException) throwable;
-							updateSlot(slotId, exception.getAllocationId());
+							updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
 						} else {
 							removeSlotRequestFromSlot(slotId, allocationId);
 						}
@@ -765,8 +773,11 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			AllocationID oldAllocationId = slot.getAllocationId();
-
-			fulfilledSlotRequests.remove(oldAllocationId);
+			if (oldAllocationId != null) {
+				fulfilledSlotRequests.remove(oldAllocationId);
+				resourceActions.notifyAllocationFailure(
+					slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+			}
 		} else {
 			LOG.debug("There was no slot registered with slot id {}.", slotId);
 		}
@@ -798,7 +809,7 @@ public class SlotManager implements AutoCloseable {
 				// clear the pending slot request
 				taskManagerSlot.clearPendingSlotRequest();
 
-				updateSlotState(taskManagerSlot, taskManagerRegistration, null);
+				updateSlotState(taskManagerSlot, taskManagerRegistration, null, null);
 			} else {
 				LOG.debug("Ignore slot request removal for slot {}.", slotId);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index dda2688..ae69e56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -762,7 +762,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 				log.info(message);
 
-				throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
+				final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
+				throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
 			}
 
 			if (jobManagerTable.contains(jobId)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
index 93e67a8..818754c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor.exceptions;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.util.Preconditions;
 
@@ -26,22 +27,31 @@ public class SlotOccupiedException extends SlotAllocationException {
 
 	private final AllocationID allocationId;
 
-	public SlotOccupiedException(String message, AllocationID allocationId) {
+	private final JobID jobId;
+
+	public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) {
 		super(message);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = jobId;
 	}
 
-	public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) {
+	public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) {
 		super(message, cause);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = jobId;
 	}
 
-	public SlotOccupiedException(Throwable cause, AllocationID allocationId) {
+	public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) {
 		super(cause);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = jobId;
 	}
 
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 65117af..e887fc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -166,6 +166,11 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	}
 
 	@Override
+	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
 		throw new UnsupportedOperationException();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index af6f3e4..1b072d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -48,7 +49,12 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -57,6 +63,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -1148,7 +1156,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
+	 * Tests that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
 	 * fails.
 	 */
 	@Test
@@ -1202,6 +1210,99 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests notify the job manager of the allocations when the task manager is failed/killed.
+	 */
+	@Test
+	public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
+
+		final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>();
+
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+				@Override
+				public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
+					notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId));
+				}})) {
+
+			// register slot request for job1.
+			JobID jobId1 = new JobID();
+			final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+			final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+			slotManager.registerSlotRequest(slotRequest11);
+			slotManager.registerSlotRequest(slotRequest12);
+
+			// create task-manager-1 with 2 slots.
+			final ResourceID taskExecutorResourceId1 = ResourceID.generate();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
+				.createTestingTaskExecutorGateway();
+			final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1);
+			final Set<SlotStatus> tm1SlotStatusList = new HashSet<>();
+			tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
+			tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
+
+			// register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1.
+			slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList));
+
+			// register slot request for job2.
+			JobID jobId2 = new JobID();
+			final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+			final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+			slotManager.registerSlotRequest(slotRequest21);
+			slotManager.registerSlotRequest(slotRequest22);
+
+			// register slot request for job3.
+			JobID jobId3 = new JobID();
+			final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
+			slotManager.registerSlotRequest(slotRequest31);
+
+			// create task-manager-2 with 3 slots.
+			final ResourceID taskExecutorResourceId2 = ResourceID.generate();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
+				.createTestingTaskExecutorGateway();
+			final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2);
+			final Set<SlotStatus> tm2SlotStatusList = new HashSet<>();
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
+
+			// register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3.
+			slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList));
+
+			// --------------------- valid the notify task manager terminated ------------------------
+
+			// valid for job1.
+			slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
+
+			assertEquals(2, notifiedTaskManagerInfos.size());
+
+			assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0));
+			assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0));
+
+			assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()),
+				Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
+
+			notifiedTaskManagerInfos.clear();
+
+			// valid the result for job2 and job3.
+			slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
+
+			assertEquals(3, notifiedTaskManagerInfos.size());
+
+			Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
+
+			assertEquals(2, job2AndJob3FailedAllocationInfo.size());
+
+			// valid for job2
+			assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()),
+				job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+
+			// valid for job3
+			assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
+				job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+		}
+	}
+
 	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),