You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:55 UTC
[flink] 02/06: [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 53938680abe846e1947243809b4b43d5f67610d3
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:06:10 2022 +0800
[FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo
---
.../src/test/resources/rest_api_v1.snapshot | 6 ++++
.../runtime/resourcemanager/ResourceManager.java | 7 +++--
.../taskmanager/TaskManagerDetailsInfo.java | 7 ++++-
.../rest/messages/taskmanager/TaskManagerInfo.java | 32 ++++++++++++++++++----
.../taskmanager/TaskManagerDetailsHandlerTest.java | 3 +-
.../messages/taskmanager/TaskManagerInfoTest.java | 3 +-
6 files changed, 48 insertions(+), 10 deletions(-)
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index f0e069087af..427f59c2723 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3439,6 +3439,9 @@
"type" : "integer"
}
}
+ },
+ "blocked" : {
+ "type" : "boolean"
}
}
}
@@ -3597,6 +3600,9 @@
}
}
},
+ "blocked" : {
+ "type" : "boolean"
+ },
"allocatedSlots" : {
"type" : "array",
"items" : {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 9a9a17b4cbc..fb85cf88231 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -646,7 +646,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription(),
- taskExecutor.getMemoryConfiguration()));
+ taskExecutor.getMemoryConfiguration(),
+ blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID())));
}
return CompletableFuture.completedFuture(taskManagerInfos);
@@ -675,7 +676,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription(),
- taskExecutor.getMemoryConfiguration()),
+ taskExecutor.getMemoryConfiguration(),
+ blocklistHandler.isBlockedTaskManager(
+ taskExecutor.getResourceID())),
slotManager.getAllocatedSlotsOf(instanceId));
return CompletableFuture.completedFuture(taskManagerInfoWithSlots);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 657dfe7af8c..c9df8325e10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -33,6 +33,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.Objects;
@@ -64,6 +66,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
@JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
@JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+ @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,
@JsonProperty(FIELD_NAME_ALLOCATED_SLOTS) Collection<SlotInfo> allocatedSlots,
@JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
super(
@@ -77,7 +80,8 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
totalResource,
freeResource,
hardwareDescription,
- memoryConfiguration);
+ memoryConfiguration,
+ blocked);
this.taskManagerMetrics = Preconditions.checkNotNull(taskManagerMetrics);
this.allocatedSlots = Preconditions.checkNotNull(allocatedSlots);
@@ -98,6 +102,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
taskManagerInfoWithSlots.getTaskManagerInfo().getFreeResource(),
taskManagerInfoWithSlots.getTaskManagerInfo().getHardwareDescription(),
taskManagerInfoWithSlots.getTaskManagerInfo().getMemoryConfiguration(),
+ taskManagerInfoWithSlots.getTaskManagerInfo().getBlocked(),
taskManagerInfoWithSlots.getAllocatedSlots(),
taskManagerMetrics);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index b8d93ccd08d..308e49b504d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -31,10 +31,14 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Objects;
@@ -63,6 +67,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
public static final String FIELD_NAME_MEMORY = "memoryConfiguration";
+ public static final String FIELD_NAME_BLOCKED = "blocked";
+
private static final long serialVersionUID = 1L;
@JsonProperty(FIELD_NAME_RESOURCE_ID)
@@ -99,7 +105,12 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
@JsonProperty(FIELD_NAME_MEMORY)
private final TaskExecutorMemoryConfiguration memoryConfiguration;
+ @JsonProperty(FIELD_NAME_BLOCKED)
+ @JsonInclude(Include.NON_DEFAULT)
+ private final boolean blocked;
+
@JsonCreator
+ // blocked is Nullable since Jackson will assign null if the field is absent while parsing
public TaskManagerInfo(
@JsonDeserialize(using = ResourceIDDeserializer.class)
@JsonProperty(FIELD_NAME_RESOURCE_ID)
@@ -113,7 +124,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
@JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource,
@JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
@JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
- @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) {
+ @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+ @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {
this.resourceId = Preconditions.checkNotNull(resourceId);
this.address = Preconditions.checkNotNull(address);
this.dataPort = dataPort;
@@ -125,6 +137,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
this.freeResource = freeResource;
this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration);
+ this.blocked = (blocked != null) && blocked;
}
public TaskManagerInfo(
@@ -138,7 +151,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
ResourceProfile totalResource,
ResourceProfile freeResource,
HardwareDescription hardwareDescription,
- TaskExecutorMemoryConfiguration memoryConfiguration) {
+ TaskExecutorMemoryConfiguration memoryConfiguration,
+ @Nullable Boolean blocked) {
this(
resourceId,
address,
@@ -150,7 +164,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
ResourceProfileInfo.fromResrouceProfile(totalResource),
ResourceProfileInfo.fromResrouceProfile(freeResource),
hardwareDescription,
- memoryConfiguration);
+ memoryConfiguration,
+ blocked);
}
@JsonIgnore
@@ -208,6 +223,11 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
return memoryConfiguration;
}
+ @JsonIgnore
+ public boolean getBlocked() {
+ return blocked;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -227,7 +247,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
&& Objects.equals(resourceId, that.resourceId)
&& Objects.equals(address, that.address)
&& Objects.equals(hardwareDescription, that.hardwareDescription)
- && Objects.equals(memoryConfiguration, that.memoryConfiguration);
+ && Objects.equals(memoryConfiguration, that.memoryConfiguration)
+ && blocked == that.blocked;
}
@Override
@@ -243,6 +264,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
totalResource,
freeResource,
hardwareDescription,
- memoryConfiguration);
+ memoryConfiguration,
+ blocked);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 95a64901af9..5e2e14864d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -172,7 +172,8 @@ class TaskManagerDetailsHandlerTest {
ResourceProfile.ZERO,
ResourceProfile.ZERO,
new HardwareDescription(0, 0L, 0L, 0L),
- new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L));
+ new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L),
+ false);
}
private static HandlerRequest<EmptyRequestBody> createRequest() throws HandlerRequestException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index 7231c94070b..140134ee953 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -65,6 +65,7 @@ public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskMan
random.nextLong(),
random.nextLong(),
random.nextLong(),
- random.nextLong()));
+ random.nextLong()),
+ random.nextBoolean());
}
}