You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:27 UTC
[29/30] flink git commit: [FLINK-7870] [runtime] slot pool cancel
slot request to resource manager if timeout
[FLINK-7870] [runtime] slot pool cancel slot request to resource manager if timeout
Summary: slot pool cancel slot request to resource manager if timeout
Test Plan: unit test
Reviewers: haitao.w
Differential Revision: https://aone.alibaba-inc.com/code/D320749
This closes #4887.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/902425f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/902425f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/902425f5
Branch: refs/heads/master
Commit: 902425f5752d0c80f155bc444de93ce250c7ce62
Parents: 26e3d37
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Oct 17 17:57:18 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/instance/SlotPool.java | 7 +++++++
.../flink/runtime/resourcemanager/ResourceManager.java | 6 ++++++
.../runtime/resourcemanager/ResourceManagerGateway.java | 7 +++++++
.../resourcemanager/slotmanager/SlotManagerTest.java | 9 +++++++++
4 files changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 1944b38..12dbc63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -316,6 +316,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
+ future.whenComplete(
+ (value, throwable) -> {
+ if (throwable != null) {
+ resourceManagerGateway.cancelSlotRequest(allocationID);
+ }
+ });
+
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 42fed29..c8d7302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -409,6 +409,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
@Override
+ public void cancelSlotRequest(AllocationID allocationID) {
+ // As the slot allocations are async, it can not avoid all redundant slots, but should best effort.
+ slotManager.unregisterSlotRequest(allocationID);
+ }
+
+ @Override
public void notifySlotAvailable(
final InstanceID instanceID,
final SlotID slotId,
http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7b95de7..88538ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -76,6 +76,13 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
@RpcTimeout Time timeout);
/**
+ * Cancel the slot allocation requests from the resource manager.
+ *
+ * @param allocationID The slot to request
+ */
+ void cancelSlotRequest(AllocationID allocationID);
+
+ /**
* Register a {@link TaskExecutor} at the resource manager.
*
* @param taskExecutorAddress The address of the TaskExecutor that registers
http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index cf0aef9..55a9946 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -292,6 +292,15 @@ public class SlotManagerTest extends TestLogger {
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
+ // verify that if the request has not been assigned, should cancel the resource allocation
+ slotManager.registerSlotRequest(slotRequest);
+ PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId);
+ assertFalse(pendingSlotRequest.isAssigned());
+
+ slotManager.unregisterSlotRequest(allocationId);
+ pendingSlotRequest = slotManager.getSlotRequest(allocationId);
+ assertTrue(pendingSlotRequest == null);
+
slotManager.registerTaskManager(taskManagerConnection, slotReport);
TaskManagerSlot slot = slotManager.getSlot(slotId);