You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/11 15:06:11 UTC

[flink] 03/10: [FLINK-17017][runtime] Allow to set whether a physical slot payload will occupy the slot indefinitely

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7054f97350efec83e3bf8b10b9b2c44f1a149cf1
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon May 18 12:40:53 2020 +0800

    [FLINK-17017][runtime] Allow to set whether a physical slot payload will occupy the slot indefinitely
---
 .../runtime/jobmaster/slotpool/PhysicalSlot.java   |  9 +++++++-
 .../jobmaster/slotpool/SingleLogicalSlot.java      | 24 +++++++++++++++++++++-
 .../jobmaster/slotpool/SlotSharingManager.java     |  5 +++++
 3 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
index 54c2d8a..9c91801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
@@ -41,10 +41,17 @@ public interface PhysicalSlot extends SlotContext {
 	interface Payload {
 
 		/**
-		 * Releases the payload
+		 * Releases the payload.
 		 *
 		 * @param cause of the payload release
 		 */
 		void release(Throwable cause);
+
+		/**
+		 * Returns whether the payload will occupy a physical slot indefinitely.
+		 *
+		 * @return true if the payload will occupy a physical slot indefinitely, otherwise false
+		 */
+		boolean willOccupySlotIndefinitely();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 707bba9..e98b57e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -70,17 +71,33 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 	// LogicalSlot.Payload of this slot
 	private volatile Payload payload;
 
+	/** Whether this logical slot will be occupied indefinitely. */
+	private boolean willBeOccupiedIndefinitely;
+
+	@VisibleForTesting
+	public SingleLogicalSlot(
+		SlotRequestId slotRequestId,
+		SlotContext slotContext,
+		@Nullable SlotSharingGroupId slotSharingGroupId,
+		Locality locality,
+		SlotOwner slotOwner) {
+
+		this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true);
+	}
+
 	public SingleLogicalSlot(
 			SlotRequestId slotRequestId,
 			SlotContext slotContext,
 			@Nullable SlotSharingGroupId slotSharingGroupId,
 			Locality locality,
-			SlotOwner slotOwner) {
+			SlotOwner slotOwner,
+			boolean willBeOccupiedIndefinitely) {
 		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 		this.slotContext = Preconditions.checkNotNull(slotContext);
 		this.slotSharingGroupId = slotSharingGroupId;
 		this.locality = Preconditions.checkNotNull(locality);
 		this.slotOwner = Preconditions.checkNotNull(slotOwner);
+		this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely;
 		this.releaseFuture = new CompletableFuture<>();
 
 		this.state = State.ALIVE;
@@ -168,6 +185,11 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 		releaseFuture.complete(null);
 	}
 
+	@Override
+	public boolean willOccupySlotIndefinitely() {
+		return willBeOccupiedIndefinitely;
+	}
+
 	private void signalPayloadRelease(Throwable cause) {
 		tryAssignPayload(TERMINATED_PAYLOAD);
 		payload.fail(cause);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index bd0675a..28b0cb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -574,6 +574,11 @@ public class SlotSharingManager {
 		}
 
 		@Override
+		public boolean willOccupySlotIndefinitely() {
+			throw new UnsupportedOperationException("Shared slot are not allowed for slot occupation check.");
+		}
+
+		@Override
 		public ResourceProfile getReservedResources() {
 			return reservedResources;
 		}