You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/11 15:06:11 UTC
[flink] 03/10: [FLINK-17017][runtime] Allow to set whether a
physical slot payload will occupy the slot indefinitely
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7054f97350efec83e3bf8b10b9b2c44f1a149cf1
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon May 18 12:40:53 2020 +0800
[FLINK-17017][runtime] Allow to set whether a physical slot payload will occupy the slot indefinitely
---
.../runtime/jobmaster/slotpool/PhysicalSlot.java | 9 +++++++-
.../jobmaster/slotpool/SingleLogicalSlot.java | 24 +++++++++++++++++++++-
.../jobmaster/slotpool/SlotSharingManager.java | 5 +++++
3 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
index 54c2d8a..9c91801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
@@ -41,10 +41,17 @@ public interface PhysicalSlot extends SlotContext {
interface Payload {
/**
- * Releases the payload
+ * Releases the payload.
*
* @param cause of the payload release
*/
void release(Throwable cause);
+
+ /**
+ * Returns whether the payload will occupy a physical slot indefinitely.
+ *
+ * @return true if the payload will occupy a physical slot indefinitely, otherwise false
+ */
+ boolean willOccupySlotIndefinitely();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 707bba9..e98b57e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -70,17 +71,33 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
// LogicalSlot.Payload of this slot
private volatile Payload payload;
+ /** Whether this logical slot will be occupied indefinitely. */
+ private boolean willBeOccupiedIndefinitely;
+
+ @VisibleForTesting
+ public SingleLogicalSlot(
+ SlotRequestId slotRequestId,
+ SlotContext slotContext,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ Locality locality,
+ SlotOwner slotOwner) {
+
+ this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true);
+ }
+
public SingleLogicalSlot(
SlotRequestId slotRequestId,
SlotContext slotContext,
@Nullable SlotSharingGroupId slotSharingGroupId,
Locality locality,
- SlotOwner slotOwner) {
+ SlotOwner slotOwner,
+ boolean willBeOccupiedIndefinitely) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.slotContext = Preconditions.checkNotNull(slotContext);
this.slotSharingGroupId = slotSharingGroupId;
this.locality = Preconditions.checkNotNull(locality);
this.slotOwner = Preconditions.checkNotNull(slotOwner);
+ this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely;
this.releaseFuture = new CompletableFuture<>();
this.state = State.ALIVE;
@@ -168,6 +185,11 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
releaseFuture.complete(null);
}
+ @Override
+ public boolean willOccupySlotIndefinitely() {
+ return willBeOccupiedIndefinitely;
+ }
+
private void signalPayloadRelease(Throwable cause) {
tryAssignPayload(TERMINATED_PAYLOAD);
payload.fail(cause);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index bd0675a..28b0cb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -574,6 +574,11 @@ public class SlotSharingManager {
}
@Override
+ public boolean willOccupySlotIndefinitely() {
+ throw new UnsupportedOperationException("Shared slot are not allowed for slot occupation check.");
+ }
+
+ @Override
public ResourceProfile getReservedResources() {
return reservedResources;
}