You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/08 14:21:48 UTC
[flink] 14/14: [FLINK-25817] Unify logic between TaskExecutor.requestSlot and TaskExecutor.tryLoadLocalAllocationSnapshots
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 00f2c62749206dff95ae6fd02f0e3d8b8530be89
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Feb 5 17:30:23 2022 +0100
[FLINK-25817] Unify logic between TaskExecutor.requestSlot and TaskExecutor.tryLoadLocalAllocationSnapshots
This commit unifies the logic between TaskExecutor.requestSlot and TaskExecutor.tryLoadLocalAllocationSnapshots.
This helps to reduce maintaince costs since both method use the same logic.
This closes #18237.
---
.../flink/runtime/taskexecutor/TaskExecutor.java | 48 +++++++++++++---------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7e4afb2..94a2262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1080,10 +1080,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
slotId, jobId, targetAddress, allocationId, resourceProfile));
try {
- allocateSlot(slotId, jobId, allocationId, resourceProfile);
- } catch (SlotAllocationException sae) {
- return FutureUtils.completedExceptionally(sae);
+ final boolean isConnected =
+ allocateSlotForJob(jobId, slotId, allocationId, resourceProfile, targetAddress);
+
+ if (isConnected) {
+ offerSlotsToJobManager(jobId);
+ }
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ } catch (SlotAllocationException e) {
+ log.debug("Could not allocate slot for allocation id {}.", allocationId, e);
+ return FutureUtils.completedExceptionally(e);
}
+ }
+
+ private boolean allocateSlotForJob(
+ JobID jobId,
+ SlotID slotId,
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ String targetAddress)
+ throws SlotAllocationException {
+ allocateSlot(slotId, jobId, allocationId, resourceProfile);
final JobTable.Job job;
@@ -1109,15 +1127,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
onFatalError(new Exception("Could not free slot " + slotId));
}
- return FutureUtils.completedExceptionally(
- new SlotAllocationException("Could not create new job.", e));
- }
-
- if (job.isConnected()) {
- offerSlotsToJobManager(jobId);
+ throw new SlotAllocationException("Could not create new job.", e);
}
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return job.isConnected();
}
private TaskExecutorJobServices registerNewJobAndCreateServices(
@@ -2052,19 +2065,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
final Set<AllocationID> allocatedSlots = new HashSet<>();
for (SlotAllocationSnapshot slotAllocationSnapshot : slotAllocationSnapshots) {
try {
- allocateSlot(
- slotAllocationSnapshot.getSlotID(),
+ allocateSlotForJob(
slotAllocationSnapshot.getJobId(),
+ slotAllocationSnapshot.getSlotID(),
slotAllocationSnapshot.getAllocationId(),
- slotAllocationSnapshot.getResourceProfile());
+ slotAllocationSnapshot.getResourceProfile(),
+ slotAllocationSnapshot.getJobTargetAddress());
- jobTable.getOrCreateJob(
- slotAllocationSnapshot.getJobId(),
- () ->
- registerNewJobAndCreateServices(
- slotAllocationSnapshot.getJobId(),
- slotAllocationSnapshot.getJobTargetAddress()));
- } catch (Exception e) {
+ } catch (SlotAllocationException e) {
log.debug("Cannot reallocate restored slot {}.", slotAllocationSnapshot, e);
}