You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:27 UTC

[29/30] flink git commit: [FLINK-7870] [runtime] slot pool cancel slot request to resource manager if timeout

[FLINK-7870] [runtime] slot pool cancel slot request to resource manager if timeout

Summary: slot pool cancel slot request to resource manager if timeout

Test Plan: unit test

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D320749

This closes #4887.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/902425f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/902425f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/902425f5

Branch: refs/heads/master
Commit: 902425f5752d0c80f155bc444de93ce250c7ce62
Parents: 26e3d37
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Oct 17 17:57:18 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/instance/SlotPool.java    | 7 +++++++
 .../flink/runtime/resourcemanager/ResourceManager.java      | 6 ++++++
 .../runtime/resourcemanager/ResourceManagerGateway.java     | 7 +++++++
 .../resourcemanager/slotmanager/SlotManagerTest.java        | 9 +++++++++
 4 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 1944b38..12dbc63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -316,6 +316,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
 
+		future.whenComplete(
+			(value, throwable) -> {
+				if (throwable != null) {
+					resourceManagerGateway.cancelSlotRequest(allocationID);
+				}
+			});
+
 		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
 			jobMasterId,
 			new SlotRequest(jobId, allocationID, resources, jobManagerAddress),

http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 42fed29..c8d7302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -409,6 +409,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@Override
+	public void cancelSlotRequest(AllocationID allocationID) {
+		// As the slot allocations are async, it can not avoid all redundant slots, but should best effort.
+		slotManager.unregisterSlotRequest(allocationID);
+	}
+
+	@Override
 	public void notifySlotAvailable(
 			final InstanceID instanceID,
 			final SlotID slotId,

http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7b95de7..88538ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -76,6 +76,13 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 		@RpcTimeout Time timeout);
 
 	/**
+	 * Cancel the slot allocation requests from the resource manager.
+	 *
+	 * @param allocationID The slot to request
+	 */
+	void cancelSlotRequest(AllocationID allocationID);
+
+	/**
 	 * Register a {@link TaskExecutor} at the resource manager.
 	 *
 	 * @param taskExecutorAddress The address of the TaskExecutor that registers

http://git-wip-us.apache.org/repos/asf/flink/blob/902425f5/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index cf0aef9..55a9946 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -292,6 +292,15 @@ public class SlotManagerTest extends TestLogger {
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
+			// verify that if the request has not been assigned, should cancel the resource allocation
+			slotManager.registerSlotRequest(slotRequest);
+			PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId);
+			assertFalse(pendingSlotRequest.isAssigned());
+
+			slotManager.unregisterSlotRequest(allocationId);
+			pendingSlotRequest = slotManager.getSlotRequest(allocationId);
+			assertTrue(pendingSlotRequest == null);
+
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);