You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/01 22:19:04 UTC
[2/2] flink git commit: [FLINK-9456] Let ResourceManager notify
JobManager about failed/killed TaskManagers.
[FLINK-9456] Let ResourceManager notify JobManager about failed/killed TaskManagers.
This closes #6132.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50c0ea8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50c0ea8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50c0ea8c
Branch: refs/heads/master
Commit: 50c0ea8c9fe17278d45aba476a95791152a1420b
Parents: 432e48a
Author: sihuazhou <su...@163.com>
Authored: Thu May 10 14:36:27 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Jul 1 21:10:04 2018 +0200
----------------------------------------------------------------------
.../clusterframework/types/TaskManagerSlot.java | 19 +++-
.../flink/runtime/jobmaster/JobMaster.java | 5 +
.../runtime/jobmaster/JobMasterGateway.java | 8 ++
.../runtime/jobmaster/slotpool/SlotPool.java | 2 +-
.../resourcemanager/ResourceManager.java | 5 +
.../slotmanager/SlotManager.java | 45 +++++---
.../runtime/taskexecutor/TaskExecutor.java | 3 +-
.../exceptions/SlotOccupiedException.java | 16 ++-
.../utils/TestingJobMasterGateway.java | 5 +
.../slotmanager/SlotManagerTest.java | 103 ++++++++++++++++++-
10 files changed, 186 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index fb7fce3..be39424 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -18,11 +18,14 @@
package org.apache.flink.runtime.clusterframework.types;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,6 +48,10 @@ public class TaskManagerSlot {
/** Allocation id for which this slot has been allocated. */
private AllocationID allocationId;
+ /** Allocation id for which this slot has been allocated. */
+ @Nullable
+ private JobID jobId;
+
/** Assigned slot request if there is currently an ongoing request. */
private PendingSlotRequest assignedSlotRequest;
@@ -83,6 +90,10 @@ public class TaskManagerSlot {
return allocationId;
}
+ public JobID getJobId() {
+ return jobId;
+ }
+
public PendingSlotRequest getAssignedSlotRequest() {
return assignedSlotRequest;
}
@@ -96,6 +107,7 @@ public class TaskManagerSlot {
state = State.FREE;
allocationId = null;
+ jobId = null;
}
public void clearPendingSlotRequest() {
@@ -112,21 +124,24 @@ public class TaskManagerSlot {
assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest);
}
- public void completeAllocation(AllocationID allocationId) {
+ public void completeAllocation(AllocationID allocationId, JobID jobId) {
Preconditions.checkNotNull(allocationId, "Allocation id must not be null.");
+ Preconditions.checkNotNull(jobId, "Job id must not be null.");
Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated.");
Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request.");
state = State.ALLOCATED;
this.allocationId = allocationId;
+ this.jobId = jobId;
assignedSlotRequest = null;
}
- public void updateAllocation(AllocationID allocationId) {
+ public void updateAllocation(AllocationID allocationId, JobID jobId) {
Preconditions.checkState(state == State.FREE, "The slot has to be free in order to set an allocation id.");
state = State.ALLOCATED;
this.allocationId = Preconditions.checkNotNull(allocationId);
+ this.jobId = Preconditions.checkNotNull(jobId);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e4a1b6a..7557bc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -980,6 +980,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
operatorBackPressureStats.orElse(null)));
}
+ @Override
+ public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
+ slotPool.failAllocation(allocationID, cause);
+ }
+
//----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4ea9357..981222d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -278,4 +278,12 @@ public interface JobMasterGateway extends
* not available (yet).
*/
CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+ /**
+ * Notifies that the allocation has failed.
+ *
+ * @param allocationID the failed allocation id.
+ * @param cause the reason that the allocation failed
+ */
+ void notifyAllocationFailure(AllocationID allocationID, Exception cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 81b3e24..27440a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
allocatedSlot.releasePayload(cause);
}
else {
- log.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+ log.trace("Outdated request to fail slot [{}] with ", allocationID, cause);
}
}
// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6e5c824..3ea5c2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1013,6 +1013,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
validateRunsInMainThread();
log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
+
+ JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
+ if (jobManagerRegistration != null) {
+ jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index fe503b2..d0d03f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -343,6 +344,7 @@ public class SlotManager implements AutoCloseable {
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
+ slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
@@ -392,7 +394,7 @@ public class SlotManager implements AutoCloseable {
if (null != taskManagerRegistration) {
for (SlotStatus slotStatus : slotReport) {
- updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
+ updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID());
}
return true;
@@ -426,7 +428,7 @@ public class SlotManager implements AutoCloseable {
slot.getInstanceId() + " which has not been registered.");
}
- updateSlotState(slot, taskManagerRegistration, null);
+ updateSlotState(slot, taskManagerRegistration, null, null);
} else {
LOG.debug("Received request to free slot {} with expected allocation id {}, " +
"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -515,6 +517,7 @@ public class SlotManager implements AutoCloseable {
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
+ JobID jobId,
ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {
@@ -530,7 +533,7 @@ public class SlotManager implements AutoCloseable {
slots.put(slotId, slot);
- updateSlot(slotId, allocationId);
+ updateSlot(slotId, allocationId, jobId);
}
/**
@@ -540,14 +543,14 @@ public class SlotManager implements AutoCloseable {
* @param allocationId specifying the current allocation of the slot
* @return True if the slot could be updated; otherwise false
*/
- private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
+ private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
final TaskManagerSlot slot = slots.get(slotId);
if (slot != null) {
final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
if (taskManagerRegistration != null) {
- updateSlotState(slot, taskManagerRegistration, allocationId);
+ updateSlotState(slot, taskManagerRegistration, allocationId, jobId);
return true;
} else {
@@ -561,7 +564,11 @@ public class SlotManager implements AutoCloseable {
}
}
- private void updateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) {
+ private void updateSlotState(
+ TaskManagerSlot slot,
+ TaskManagerRegistration taskManagerRegistration,
+ @Nullable AllocationID allocationId,
+ @Nullable JobID jobId) {
if (null != allocationId) {
switch (slot.getState()) {
case PENDING:
@@ -575,12 +582,12 @@ public class SlotManager implements AutoCloseable {
// remove the pending slot request, since it has been completed
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
- slot.completeAllocation(allocationId);
+ slot.completeAllocation(allocationId, jobId);
} else {
// we first have to free the slot in order to set a new allocationId
slot.clearPendingSlotRequest();
// set the allocation id such that the slot won't be considered for the pending slot request
- slot.updateAllocation(allocationId);
+ slot.updateAllocation(allocationId, jobId);
// this will try to find a new slot for the request
rejectPendingSlotRequest(
@@ -593,13 +600,13 @@ public class SlotManager implements AutoCloseable {
case ALLOCATED:
if (!Objects.equals(allocationId, slot.getAllocationId())) {
slot.freeSlot();
- slot.updateAllocation(allocationId);
+ slot.updateAllocation(allocationId, jobId);
}
break;
case FREE:
// the slot is currently free --> it is stored in freeSlots
freeSlots.remove(slot.getSlotId());
- slot.updateAllocation(allocationId);
+ slot.updateAllocation(allocationId, jobId);
taskManagerRegistration.occupySlot();
break;
}
@@ -660,15 +667,16 @@ public class SlotManager implements AutoCloseable {
final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
+ final InstanceID instanceID = taskManagerSlot.getInstanceId();
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
- TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
if (taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " +
- taskManagerSlot.getInstanceId() + '.');
+ instanceID + '.');
}
taskManagerRegistration.markUsed();
@@ -695,11 +703,11 @@ public class SlotManager implements AutoCloseable {
(Acknowledge acknowledge, Throwable throwable) -> {
try {
if (acknowledge != null) {
- updateSlot(slotId, allocationId);
+ updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if (throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
- updateSlot(slotId, exception.getAllocationId());
+ updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
removeSlotRequestFromSlot(slotId, allocationId);
}
@@ -765,8 +773,11 @@ public class SlotManager implements AutoCloseable {
}
AllocationID oldAllocationId = slot.getAllocationId();
-
- fulfilledSlotRequests.remove(oldAllocationId);
+ if (oldAllocationId != null) {
+ fulfilledSlotRequests.remove(oldAllocationId);
+ resourceActions.notifyAllocationFailure(
+ slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+ }
} else {
LOG.debug("There was no slot registered with slot id {}.", slotId);
}
@@ -798,7 +809,7 @@ public class SlotManager implements AutoCloseable {
// clear the pending slot request
taskManagerSlot.clearPendingSlotRequest();
- updateSlotState(taskManagerSlot, taskManagerRegistration, null);
+ updateSlotState(taskManagerSlot, taskManagerRegistration, null, null);
} else {
LOG.debug("Ignore slot request removal for slot {}.", slotId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index dda2688..ae69e56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -762,7 +762,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
log.info(message);
- throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
+ final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
+ throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
if (jobManagerTable.contains(jobId)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
index 93e67a8..818754c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskexecutor.exceptions;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.util.Preconditions;
@@ -26,22 +27,31 @@ public class SlotOccupiedException extends SlotAllocationException {
private final AllocationID allocationId;
- public SlotOccupiedException(String message, AllocationID allocationId) {
+ private final JobID jobId;
+
+ public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) {
super(message);
this.allocationId = Preconditions.checkNotNull(allocationId);
+ this.jobId = jobId;
}
- public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) {
+ public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) {
super(message, cause);
this.allocationId = Preconditions.checkNotNull(allocationId);
+ this.jobId = jobId;
}
- public SlotOccupiedException(Throwable cause, AllocationID allocationId) {
+ public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) {
super(cause);
this.allocationId = Preconditions.checkNotNull(allocationId);
+ this.jobId = jobId;
}
public AllocationID getAllocationId() {
return allocationId;
}
+
+ public JobID getJobId() {
+ return jobId;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 65117af..e887fc1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -166,6 +166,11 @@ public class TestingJobMasterGateway implements JobMasterGateway {
}
@Override
+ public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index af6f3e4..1b072d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -48,7 +49,12 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -57,6 +63,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@@ -1148,7 +1156,7 @@ public class SlotManagerTest extends TestLogger {
}
/**
- * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
+ * Tests that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call
* fails.
*/
@Test
@@ -1202,6 +1210,99 @@ public class SlotManagerTest extends TestLogger {
}
}
+ /**
+ * Tests notify the job manager of the allocations when the task manager is failed/killed.
+ */
+ @Test
+ public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
+
+ final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>();
+
+ try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+ @Override
+ public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
+ notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId));
+ }})) {
+
+ // register slot request for job1.
+ JobID jobId1 = new JobID();
+ final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+ final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+ slotManager.registerSlotRequest(slotRequest11);
+ slotManager.registerSlotRequest(slotRequest12);
+
+ // create task-manager-1 with 2 slots.
+ final ResourceID taskExecutorResourceId1 = ResourceID.generate();
+ final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
+ .createTestingTaskExecutorGateway();
+ final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1);
+ final Set<SlotStatus> tm1SlotStatusList = new HashSet<>();
+ tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
+ tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
+
+ // register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1.
+ slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList));
+
+ // register slot request for job2.
+ JobID jobId2 = new JobID();
+ final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+ final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+ slotManager.registerSlotRequest(slotRequest21);
+ slotManager.registerSlotRequest(slotRequest22);
+
+ // register slot request for job3.
+ JobID jobId3 = new JobID();
+ final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
+ slotManager.registerSlotRequest(slotRequest31);
+
+ // create task-manager-2 with 3 slots.
+ final ResourceID taskExecutorResourceId2 = ResourceID.generate();
+ final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
+ .createTestingTaskExecutorGateway();
+ final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2);
+ final Set<SlotStatus> tm2SlotStatusList = new HashSet<>();
+ tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
+ tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
+ tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
+ tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
+
+ // register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3.
+ slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList));
+
+ // --------------------- valid the notify task manager terminated ------------------------
+
+ // valid for job1.
+ slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
+
+ assertEquals(2, notifiedTaskManagerInfos.size());
+
+ assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0));
+ assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0));
+
+ assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()),
+ Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
+
+ notifiedTaskManagerInfos.clear();
+
+ // valid the result for job2 and job3.
+ slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
+
+ assertEquals(3, notifiedTaskManagerInfos.size());
+
+ Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
+
+ assertEquals(2, job2AndJob3FailedAllocationInfo.size());
+
+ // valid for job2
+ assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()),
+ job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+
+ // valid for job3
+ assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
+ job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
+ }
+ }
+
private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),