You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/01 22:19:03 UTC

[1/2] flink git commit: [FLINK-9456] Reuse TestingResourceActions in SlotManagerTest#testNotifyFailedAllocationWhenTaskManagerTerminated

Repository: flink
Updated Branches:
  refs/heads/master 432e48a2e -> 89cfeaa88


[FLINK-9456] Reuse TestingResourceActions in SlotManagerTest#testNotifyFailedAllocationWhenTaskManagerTerminated


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89cfeaa8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89cfeaa8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89cfeaa8

Branch: refs/heads/master
Commit: 89cfeaa882f9e68df2bd215563622b48c29a9ec9
Parents: 50c0ea8
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 1 21:08:41 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jul 1 21:10:04 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/TaskManagerSlot.java |   1 +
 .../slotmanager/SlotManager.java                |  15 ++-
 .../exceptions/SlotOccupiedException.java       |  18 +--
 .../slotmanager/SlotManagerTest.java            | 113 +++++++++++--------
 .../slotmanager/TestingResourceActions.java     |  32 +++++-
 .../TestingResourceActionsBuilder.java          |  56 +++++++++
 6 files changed, 167 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index be39424..633f31a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -90,6 +90,7 @@ public class TaskManagerSlot {
 		return allocationId;
 	}
 
+	@Nullable
 	public JobID getJobId() {
 		return jobId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d0d03f5..b2dbba8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -541,6 +541,7 @@ public class SlotManager implements AutoCloseable {
 	 *
 	 * @param slotId to update
 	 * @param allocationId specifying the current allocation of the slot
+	 * @param jobId specifying the job to which the slot is allocated
 	 * @return True if the slot could be updated; otherwise false
 	 */
 	private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
@@ -565,10 +566,10 @@ public class SlotManager implements AutoCloseable {
 	}
 
 	private void updateSlotState(
-		TaskManagerSlot slot,
-		TaskManagerRegistration taskManagerRegistration,
-		@Nullable AllocationID allocationId,
-		@Nullable JobID jobId) {
+			TaskManagerSlot slot,
+			TaskManagerRegistration taskManagerRegistration,
+			@Nullable AllocationID allocationId,
+			@Nullable JobID jobId) {
 		if (null != allocationId) {
 			switch (slot.getState()) {
 				case PENDING:
@@ -773,10 +774,14 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			AllocationID oldAllocationId = slot.getAllocationId();
+
 			if (oldAllocationId != null) {
 				fulfilledSlotRequests.remove(oldAllocationId);
+
 				resourceActions.notifyAllocationFailure(
-					slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+					slot.getJobId(),
+					oldAllocationId,
+					new FlinkException("The assigned slot " + slot.getSlotId() + " was removed."));
 			}
 		} else {
 			LOG.debug("There was no slot registered with slot id {}.", slotId);

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
index 818754c..cb528de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -22,6 +22,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.util.Preconditions;
 
+/**
+ * Exception which signals that a slot is already occupied by the given
+ * {@link AllocationID}.
+ */
 public class SlotOccupiedException extends SlotAllocationException {
 	private static final long serialVersionUID = -3986333914244338888L;
 
@@ -32,19 +36,7 @@ public class SlotOccupiedException extends SlotAllocationException {
 	public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) {
 		super(message);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
-		this.jobId = jobId;
-	}
-
-	public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) {
-		super(message, cause);
-		this.allocationId = Preconditions.checkNotNull(allocationId);
-		this.jobId = jobId;
-	}
-
-	public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) {
-		super(cause);
-		this.allocationId = Preconditions.checkNotNull(allocationId);
-		this.jobId = jobId;
+		this.jobId = Preconditions.checkNotNull(jobId);
 	}
 
 	public AllocationID getAllocationId() {

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 1b072d7..fb82aa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -49,11 +50,14 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.util.ArrayList;
+import javax.annotation.Nonnull;
+
+import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -64,9 +68,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1161,7 +1166,8 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotRequestFailure() throws Exception {
-		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
+			new TestingResourceActionsBuilder().createTestingResourceActions())) {
 
 			final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
 			slotManager.registerSlotRequest(slotRequest);
@@ -1216,93 +1222,106 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
 
-		final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>();
+		final Queue<Tuple2<JobID, AllocationID>> allocationFailures = new ArrayDeque<>(5);
 
-		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
-				@Override
-				public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
-					notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId));
-				}})) {
+		final TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setNotifyAllocationFailureConsumer(
+				(Tuple3<JobID, AllocationID, Exception> failureMessage) ->
+					allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
+			.createTestingResourceActions();
+
+		try (final SlotManager slotManager = createSlotManager(
+			ResourceManagerId.generate(),
+			resourceManagerActions)) {
 
 			// register slot request for job1.
 			JobID jobId1 = new JobID();
-			final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
-			final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+			final SlotRequest slotRequest11 = createSlotRequest(jobId1);
+			final SlotRequest slotRequest12 = createSlotRequest(jobId1);
 			slotManager.registerSlotRequest(slotRequest11);
 			slotManager.registerSlotRequest(slotRequest12);
 
 			// create task-manager-1 with 2 slots.
 			final ResourceID taskExecutorResourceId1 = ResourceID.generate();
-			final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
-				.createTestingTaskExecutorGateway();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 			final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1);
-			final Set<SlotStatus> tm1SlotStatusList = new HashSet<>();
-			tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
-			tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
+			final SlotReport slotReport1 = createSlotReport(taskExecutorResourceId1, 2);
 
 			// register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1.
-			slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList));
+			slotManager.registerTaskManager(taskExecutionConnection1, slotReport1);
 
 			// register slot request for job2.
 			JobID jobId2 = new JobID();
-			final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
-			final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+			final SlotRequest slotRequest21 = createSlotRequest(jobId2);
+			final SlotRequest slotRequest22 = createSlotRequest(jobId2);
 			slotManager.registerSlotRequest(slotRequest21);
 			slotManager.registerSlotRequest(slotRequest22);
 
 			// register slot request for job3.
 			JobID jobId3 = new JobID();
-			final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
+			final SlotRequest slotRequest31 = createSlotRequest(jobId3);
 			slotManager.registerSlotRequest(slotRequest31);
 
 			// create task-manager-2 with 3 slots.
 			final ResourceID taskExecutorResourceId2 = ResourceID.generate();
-			final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
-				.createTestingTaskExecutorGateway();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 			final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2);
-			final Set<SlotStatus> tm2SlotStatusList = new HashSet<>();
-			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
-			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
-			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
-			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
+			final SlotReport slotReport2 = createSlotReport(taskExecutorResourceId2, 3);
 
 			// register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3.
-			slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList));
-
-			// --------------------- valid the notify task manager terminated ------------------------
+			slotManager.registerTaskManager(taskExecutionConnection2, slotReport2);
 
-			// valid for job1.
+			// validate for job1.
 			slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
 
-			assertEquals(2, notifiedTaskManagerInfos.size());
+			assertThat(allocationFailures, hasSize(2));
 
-			assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0));
-			assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0));
+			Tuple2<JobID, AllocationID> allocationFailure;
+			final Set<AllocationID> failedAllocations = new HashSet<>(2);
 
-			assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()),
-				Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
+			while ((allocationFailure = allocationFailures.poll()) != null) {
+				assertThat(allocationFailure.f0, equalTo(jobId1));
+				failedAllocations.add(allocationFailure.f1);
+			}
 
-			notifiedTaskManagerInfos.clear();
+			assertThat(failedAllocations, containsInAnyOrder(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()));
 
-			// valid the result for job2 and job3.
+			// validate the result for job2 and job3.
 			slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
 
-			assertEquals(3, notifiedTaskManagerInfos.size());
+			assertThat(allocationFailures, hasSize(3));
 
-			Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
+			Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = allocationFailures.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
 
-			assertEquals(2, job2AndJob3FailedAllocationInfo.size());
+			assertThat(job2AndJob3FailedAllocationInfo.entrySet(), hasSize(2));
 
-			// valid for job2
-			assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()),
-				job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+			final Set<AllocationID> job2FailedAllocations = extractFailedAllocationsForJob(jobId2, job2AndJob3FailedAllocationInfo);
+			final Set<AllocationID> job3FailedAllocations = extractFailedAllocationsForJob(jobId3, job2AndJob3FailedAllocationInfo);
 
-			// valid for job3
-			assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
-				job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+			assertThat(job2FailedAllocations, containsInAnyOrder(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()));
+			assertThat(job3FailedAllocations, containsInAnyOrder(slotRequest31.getAllocationId()));
 		}
 	}
 
+	private Set<AllocationID> extractFailedAllocationsForJob(JobID jobId2, Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
+		return job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(t -> t.f1).collect(Collectors.toSet());
+	}
+
+	@Nonnull
+	private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
+		final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
+		for (int i = 0; i < numberSlots; i++) {
+			slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN));
+		}
+
+		return new SlotReport(slotStatusSet);
+	}
+
+	@Nonnull
+	private SlotRequest createSlotRequest(JobID jobId1) {
+		return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+	}
+
 	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
index 915f142..8b7c802 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
@@ -19,26 +19,52 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
 
+import javax.annotation.Nonnull;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
 /**
  * Testing implementation of the {@link ResourceActions}.
  */
 public class TestingResourceActions implements ResourceActions {
+
+	@Nonnull
+	private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
+
+	@Nonnull
+	private final Consumer<ResourceProfile> allocateResourceConsumer;
+
+	@Nonnull
+	private final Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer;
+
+	public TestingResourceActions(
+			@Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
+			@Nonnull Consumer<ResourceProfile> allocateResourceConsumer,
+			@Nonnull Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) {
+		this.releaseResourceConsumer = releaseResourceConsumer;
+		this.allocateResourceConsumer = allocateResourceConsumer;
+		this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer;
+	}
+
+
 	@Override
 	public void releaseResource(InstanceID instanceId, Exception cause) {
-
+		releaseResourceConsumer.accept(instanceId, cause);
 	}
 
 	@Override
 	public void allocateResource(ResourceProfile resourceProfile) {
-
+		allocateResourceConsumer.accept(resourceProfile);
 	}
 
 	@Override
 	public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
-
+		notifyAllocationFailureConsumer.accept(Tuple3.of(jobId, allocationId, cause));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89cfeaa8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
new file mode 100644
index 0000000..2c1d47e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Builder for the {@link TestingResourceActions}.
+ */
+public class TestingResourceActionsBuilder {
+	private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
+	private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) -> {};
+	private Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer = (ignored) -> {};
+
+	public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
+		this.releaseResourceConsumer = releaseResourceConsumer;
+		return this;
+	}
+
+	public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) {
+		this.allocateResourceConsumer = allocateResourceConsumer;
+		return this;
+	}
+
+	public TestingResourceActionsBuilder setNotifyAllocationFailureConsumer(Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) {
+		this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer;
+		return this;
+	}
+
+	public TestingResourceActions createTestingResourceActions() {
+		return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer);
+	}
+}


[2/2] flink git commit: [FLINK-9456] Let ResourceManager notify JobManager about failed/killed TaskManagers.

Posted by tr...@apache.org.
[FLINK-9456] Let ResourceManager notify JobManager about failed/killed TaskManagers.

This closes #6132.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50c0ea8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50c0ea8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50c0ea8c

Branch: refs/heads/master
Commit: 50c0ea8c9fe17278d45aba476a95791152a1420b
Parents: 432e48a
Author: sihuazhou <su...@163.com>
Authored: Thu May 10 14:36:27 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jul 1 21:10:04 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/TaskManagerSlot.java |  19 +++-
 .../flink/runtime/jobmaster/JobMaster.java      |   5 +
 .../runtime/jobmaster/JobMasterGateway.java     |   8 ++
 .../runtime/jobmaster/slotpool/SlotPool.java    |   2 +-
 .../resourcemanager/ResourceManager.java        |   5 +
 .../slotmanager/SlotManager.java                |  45 +++++---
 .../runtime/taskexecutor/TaskExecutor.java      |   3 +-
 .../exceptions/SlotOccupiedException.java       |  16 ++-
 .../utils/TestingJobMasterGateway.java          |   5 +
 .../slotmanager/SlotManagerTest.java            | 103 ++++++++++++++++++-
 10 files changed, 186 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index fb7fce3..be39424 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,6 +48,10 @@ public class TaskManagerSlot {
 	/** Allocation id for which this slot has been allocated. */
 	private AllocationID allocationId;
 
+	/** Allocation id for which this slot has been allocated. */
+	@Nullable
+	private JobID jobId;
+
 	/** Assigned slot request if there is currently an ongoing request. */
 	private PendingSlotRequest assignedSlotRequest;
 
@@ -83,6 +90,10 @@ public class TaskManagerSlot {
 		return allocationId;
 	}
 
+	public JobID getJobId() {
+		return jobId;
+	}
+
 	public PendingSlotRequest getAssignedSlotRequest() {
 		return assignedSlotRequest;
 	}
@@ -96,6 +107,7 @@ public class TaskManagerSlot {
 
 		state = State.FREE;
 		allocationId = null;
+		jobId = null;
 	}
 
 	public void clearPendingSlotRequest() {
@@ -112,21 +124,24 @@ public class TaskManagerSlot {
 		assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
 	}
 
-	public void completeAllocation(AllocationID allocationId) {
+	public void completeAllocation(AllocationID allocationId, JobID jobId) {
 		Preconditions.checkNotNull(allocationId, "Allocation id must not be null.");
+		Preconditions.checkNotNull(jobId, "Job id must not be null.");
 		Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
 		Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
 
 		state = State.ALLOCATED;
 		this.allocationId = allocationId;
+		this.jobId = jobId;
 		assignedSlotRequest = null;
 	}
 
-	public void updateAllocation(AllocationID allocationId) {
+	public void updateAllocation(AllocationID allocationId, JobID jobId) {
 		Preconditions.checkState(state == State.FREE, "The slot has to be free in order to set an allocation id.");
 
 		state = State.ALLOCATED;
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = Preconditions.checkNotNull(jobId);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e4a1b6a..7557bc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -980,6 +980,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			operatorBackPressureStats.orElse(null)));
 	}
 
+	@Override
+	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
+		slotPool.failAllocation(allocationID, cause);
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4ea9357..981222d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -278,4 +278,12 @@ public interface JobMasterGateway extends
 	 * not available (yet).
 	 */
 	CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+	/**
+	 * Notifies that the allocation has failed.
+	 *
+	 * @param allocationID the failed allocation id.
+	 * @param cause the reason that the allocation failed
+	 */
+	void notifyAllocationFailure(AllocationID allocationID, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 81b3e24..27440a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				log.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+				log.trace("Outdated request to fail slot [{}] with ", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6e5c824..3ea5c2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1013,6 +1013,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
 			validateRunsInMainThread();
 			log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
+
+			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
+			if (jobManagerRegistration != null) {
+				jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index fe503b2..d0d03f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -343,6 +344,7 @@ public class SlotManager implements AutoCloseable {
 				registerSlot(
 					slotStatus.getSlotID(),
 					slotStatus.getAllocationID(),
+					slotStatus.getJobID(),
 					slotStatus.getResourceProfile(),
 					taskExecutorConnection);
 			}
@@ -392,7 +394,7 @@ public class SlotManager implements AutoCloseable {
 		if (null != taskManagerRegistration) {
 
 			for (SlotStatus slotStatus : slotReport) {
-				updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
+				updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID());
 			}
 
 			return true;
@@ -426,7 +428,7 @@ public class SlotManager implements AutoCloseable {
 							slot.getInstanceId() + " which has not been registered.");
 					}
 
-					updateSlotState(slot, taskManagerRegistration, null);
+					updateSlotState(slot, taskManagerRegistration, null, null);
 				} else {
 					LOG.debug("Received request to free slot {} with expected allocation id {}, " +
 						"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -515,6 +517,7 @@ public class SlotManager implements AutoCloseable {
 	private void registerSlot(
 			SlotID slotId,
 			AllocationID allocationId,
+			JobID jobId,
 			ResourceProfile resourceProfile,
 			TaskExecutorConnection taskManagerConnection) {
 
@@ -530,7 +533,7 @@ public class SlotManager implements AutoCloseable {
 
 		slots.put(slotId, slot);
 
-		updateSlot(slotId, allocationId);
+		updateSlot(slotId, allocationId, jobId);
 	}
 
 	/**
@@ -540,14 +543,14 @@ public class SlotManager implements AutoCloseable {
 	 * @param allocationId specifying the current allocation of the slot
 	 * @return True if the slot could be updated; otherwise false
 	 */
-	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
+	private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
 		final TaskManagerSlot slot = slots.get(slotId);
 
 		if (slot != null) {
 			final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
 			if (taskManagerRegistration != null) {
-				updateSlotState(slot, taskManagerRegistration, allocationId);
+				updateSlotState(slot, taskManagerRegistration, allocationId, jobId);
 
 				return true;
 			} else {
@@ -561,7 +564,11 @@ public class SlotManager implements AutoCloseable {
 		}
 	}
 
-	private void updateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
+	private void updateSlotState(
+		TaskManagerSlot slot,
+		TaskManagerRegistration taskManagerRegistration,
+		@Nullable AllocationID allocationId,
+		@Nullable JobID jobId) {
 		if (null != allocationId) {
 			switch (slot.getState()) {
 				case PENDING:
@@ -575,12 +582,12 @@ public class SlotManager implements AutoCloseable {
 						// remove the pending slot request, since it has been completed
 						pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
 
-						slot.completeAllocation(allocationId);
+						slot.completeAllocation(allocationId, jobId);
 					} else {
 						// we first have to free the slot in order to set a new allocationId
 						slot.clearPendingSlotRequest();
 						// set the allocation id such that the slot won't be considered for the pending slot request
-						slot.updateAllocation(allocationId);
+						slot.updateAllocation(allocationId, jobId);
 
 						// this will try to find a new slot for the request
 						rejectPendingSlotRequest(
@@ -593,13 +600,13 @@ public class SlotManager implements AutoCloseable {
 				case ALLOCATED:
 					if (!Objects.equals(allocationId, slot.getAllocationId())) {
 						slot.freeSlot();
-						slot.updateAllocation(allocationId);
+						slot.updateAllocation(allocationId, jobId);
 					}
 					break;
 				case FREE:
 					// the slot is currently free --> it is stored in freeSlots
 					freeSlots.remove(slot.getSlotId());
-					slot.updateAllocation(allocationId);
+					slot.updateAllocation(allocationId, jobId);
 					taskManagerRegistration.occupySlot();
 					break;
 			}
@@ -660,15 +667,16 @@ public class SlotManager implements AutoCloseable {
 		final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
 		final AllocationID allocationId = pendingSlotRequest.getAllocationId();
 		final SlotID slotId = taskManagerSlot.getSlotId();
+		final InstanceID instanceID = taskManagerSlot.getInstanceId();
 
 		taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
 		pendingSlotRequest.setRequestFuture(completableFuture);
 
-		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
 
 		if (taskManagerRegistration == null) {
 			throw new IllegalStateException("Could not find a registered task manager for instance id " +
-				taskManagerSlot.getInstanceId() + '.');
+				instanceID + '.');
 		}
 
 		taskManagerRegistration.markUsed();
@@ -695,11 +703,11 @@ public class SlotManager implements AutoCloseable {
 			(Acknowledge acknowledge, Throwable throwable) -> {
 				try {
 					if (acknowledge != null) {
-						updateSlot(slotId, allocationId);
+						updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
 					} else {
 						if (throwable instanceof SlotOccupiedException) {
 							SlotOccupiedException exception = (SlotOccupiedException) throwable;
-							updateSlot(slotId, exception.getAllocationId());
+							updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
 						} else {
 							removeSlotRequestFromSlot(slotId, allocationId);
 						}
@@ -765,8 +773,11 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			AllocationID oldAllocationId = slot.getAllocationId();
-
-			fulfilledSlotRequests.remove(oldAllocationId);
+			if (oldAllocationId != null) {
+				fulfilledSlotRequests.remove(oldAllocationId);
+				resourceActions.notifyAllocationFailure(
+					slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+			}
 		} else {
 			LOG.debug("There was no slot registered with slot id {}.", slotId);
 		}
@@ -798,7 +809,7 @@ public class SlotManager implements AutoCloseable {
 				// clear the pending slot request
 				taskManagerSlot.clearPendingSlotRequest();
 
-				updateSlotState(taskManagerSlot, taskManagerRegistration, null);
+				updateSlotState(taskManagerSlot, taskManagerRegistration, null, null);
 			} else {
 				LOG.debug("Ignore slot request removal for slot {}.", slotId);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index dda2688..ae69e56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -762,7 +762,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 				log.info(message);
 
-				throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
+				final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
+				throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
 			}
 
 			if (jobManagerTable.contains(jobId)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
index 93e67a8..818754c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor.exceptions;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.util.Preconditions;
 
@@ -26,22 +27,31 @@ public class SlotOccupiedException extends SlotAllocationException {
 
 	private final AllocationID allocationId;
 
-	public SlotOccupiedException(String message, AllocationID allocationId) {
+	private final JobID jobId;
+
+	public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) {
 		super(message);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = jobId;
 	}
 
-	public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) {
+	public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) {
 		super(message, cause);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = jobId;
 	}
 
-	public SlotOccupiedException(Throwable cause, AllocationID allocationId) {
+	public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) {
 		super(cause);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.jobId = jobId;
 	}
 
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 65117af..e887fc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -166,6 +166,11 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	}
 
 	@Override
+	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
 		throw new UnsupportedOperationException();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index af6f3e4..1b072d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -48,7 +49,12 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -57,6 +63,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -1148,7 +1156,7 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
+	 * Tests that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
 	 * fails.
 	 */
 	@Test
@@ -1202,6 +1210,99 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests notify the job manager of the allocations when the task manager is failed/killed.
+	 */
+	@Test
+	public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
+
+		final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>();
+
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+				@Override
+				public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
+					notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId));
+				}})) {
+
+			// register slot request for job1.
+			JobID jobId1 = new JobID();
+			final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+			final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+			slotManager.registerSlotRequest(slotRequest11);
+			slotManager.registerSlotRequest(slotRequest12);
+
+			// create task-manager-1 with 2 slots.
+			final ResourceID taskExecutorResourceId1 = ResourceID.generate();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
+				.createTestingTaskExecutorGateway();
+			final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1);
+			final Set<SlotStatus> tm1SlotStatusList = new HashSet<>();
+			tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
+			tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
+
+			// register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1.
+			slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList));
+
+			// register slot request for job2.
+			JobID jobId2 = new JobID();
+			final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+			final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+			slotManager.registerSlotRequest(slotRequest21);
+			slotManager.registerSlotRequest(slotRequest22);
+
+			// register slot request for job3.
+			JobID jobId3 = new JobID();
+			final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
+			slotManager.registerSlotRequest(slotRequest31);
+
+			// create task-manager-2 with 3 slots.
+			final ResourceID taskExecutorResourceId2 = ResourceID.generate();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
+				.createTestingTaskExecutorGateway();
+			final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2);
+			final Set<SlotStatus> tm2SlotStatusList = new HashSet<>();
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
+			tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
+
+			// register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3.
+			slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList));
+
+			// --------------------- valid the notify task manager terminated ------------------------
+
+			// valid for job1.
+			slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
+
+			assertEquals(2, notifiedTaskManagerInfos.size());
+
+			assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0));
+			assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0));
+
+			assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()),
+				Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
+
+			notifiedTaskManagerInfos.clear();
+
+			// valid the result for job2 and job3.
+			slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
+
+			assertEquals(3, notifiedTaskManagerInfos.size());
+
+			Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
+
+			assertEquals(2, job2AndJob3FailedAllocationInfo.size());
+
+			// valid for job2
+			assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()),
+				job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+
+			// valid for job3
+			assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
+				job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+		}
+	}
+
 	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),