You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:55 UTC

[flink] 02/06: [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 53938680abe846e1947243809b4b43d5f67610d3
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:06:10 2022 +0800

    [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo
---
 .../src/test/resources/rest_api_v1.snapshot        |  6 ++++
 .../runtime/resourcemanager/ResourceManager.java   |  7 +++--
 .../taskmanager/TaskManagerDetailsInfo.java        |  7 ++++-
 .../rest/messages/taskmanager/TaskManagerInfo.java | 32 ++++++++++++++++++----
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  3 +-
 .../messages/taskmanager/TaskManagerInfoTest.java  |  3 +-
 6 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index f0e069087af..427f59c2723 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3439,6 +3439,9 @@
                     "type" : "integer"
                   }
                 }
+              },
+              "blocked" : {
+                "type" : "boolean"
               }
             }
           }
@@ -3597,6 +3600,9 @@
             }
           }
         },
+        "blocked" : {
+          "type" : "boolean"
+        },
         "allocatedSlots" : {
           "type" : "array",
           "items" : {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 9a9a17b4cbc..fb85cf88231 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -646,7 +646,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                             slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
                             slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
                             taskExecutor.getHardwareDescription(),
-                            taskExecutor.getMemoryConfiguration()));
+                            taskExecutor.getMemoryConfiguration(),
+                            blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID())));
         }
 
         return CompletableFuture.completedFuture(taskManagerInfos);
@@ -675,7 +676,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                                     slotManager.getRegisteredResourceOf(instanceId),
                                     slotManager.getFreeResourceOf(instanceId),
                                     taskExecutor.getHardwareDescription(),
-                                    taskExecutor.getMemoryConfiguration()),
+                                    taskExecutor.getMemoryConfiguration(),
+                                    blocklistHandler.isBlockedTaskManager(
+                                            taskExecutor.getResourceID())),
                             slotManager.getAllocatedSlotsOf(instanceId));
 
             return CompletableFuture.completedFuture(taskManagerInfoWithSlots);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 657dfe7af8c..c9df8325e10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -33,6 +33,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Objects;
 
@@ -64,6 +66,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
             @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,
             @JsonProperty(FIELD_NAME_ALLOCATED_SLOTS) Collection<SlotInfo> allocatedSlots,
             @JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
         super(
@@ -77,7 +80,8 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
                 totalResource,
                 freeResource,
                 hardwareDescription,
-                memoryConfiguration);
+                memoryConfiguration,
+                blocked);
 
         this.taskManagerMetrics = Preconditions.checkNotNull(taskManagerMetrics);
         this.allocatedSlots = Preconditions.checkNotNull(allocatedSlots);
@@ -98,6 +102,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
                 taskManagerInfoWithSlots.getTaskManagerInfo().getFreeResource(),
                 taskManagerInfoWithSlots.getTaskManagerInfo().getHardwareDescription(),
                 taskManagerInfoWithSlots.getTaskManagerInfo().getMemoryConfiguration(),
+                taskManagerInfoWithSlots.getTaskManagerInfo().getBlocked(),
                 taskManagerInfoWithSlots.getAllocatedSlots(),
                 taskManagerMetrics);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index b8d93ccd08d..308e49b504d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -31,10 +31,14 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -63,6 +67,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
 
     public static final String FIELD_NAME_MEMORY = "memoryConfiguration";
 
+    public static final String FIELD_NAME_BLOCKED = "blocked";
+
     private static final long serialVersionUID = 1L;
 
     @JsonProperty(FIELD_NAME_RESOURCE_ID)
@@ -99,7 +105,12 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
     @JsonProperty(FIELD_NAME_MEMORY)
     private final TaskExecutorMemoryConfiguration memoryConfiguration;
 
+    @JsonProperty(FIELD_NAME_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final boolean blocked;
+
     @JsonCreator
+    // blocked is Nullable since Jackson will assign null if the field is absent while parsing
     public TaskManagerInfo(
             @JsonDeserialize(using = ResourceIDDeserializer.class)
                     @JsonProperty(FIELD_NAME_RESOURCE_ID)
@@ -113,7 +124,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
-            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) {
+            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {
         this.resourceId = Preconditions.checkNotNull(resourceId);
         this.address = Preconditions.checkNotNull(address);
         this.dataPort = dataPort;
@@ -125,6 +137,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
         this.freeResource = freeResource;
         this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
         this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration);
+        this.blocked = (blocked != null) && blocked;
     }
 
     public TaskManagerInfo(
@@ -138,7 +151,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
             ResourceProfile totalResource,
             ResourceProfile freeResource,
             HardwareDescription hardwareDescription,
-            TaskExecutorMemoryConfiguration memoryConfiguration) {
+            TaskExecutorMemoryConfiguration memoryConfiguration,
+            @Nullable Boolean blocked) {
         this(
                 resourceId,
                 address,
@@ -150,7 +164,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
                 ResourceProfileInfo.fromResrouceProfile(totalResource),
                 ResourceProfileInfo.fromResrouceProfile(freeResource),
                 hardwareDescription,
-                memoryConfiguration);
+                memoryConfiguration,
+                blocked);
     }
 
     @JsonIgnore
@@ -208,6 +223,11 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
         return memoryConfiguration;
     }
 
+    @JsonIgnore
+    public boolean getBlocked() {
+        return blocked;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -227,7 +247,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
                 && Objects.equals(resourceId, that.resourceId)
                 && Objects.equals(address, that.address)
                 && Objects.equals(hardwareDescription, that.hardwareDescription)
-                && Objects.equals(memoryConfiguration, that.memoryConfiguration);
+                && Objects.equals(memoryConfiguration, that.memoryConfiguration)
+                && blocked == that.blocked;
     }
 
     @Override
@@ -243,6 +264,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
                 totalResource,
                 freeResource,
                 hardwareDescription,
-                memoryConfiguration);
+                memoryConfiguration,
+                blocked);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 95a64901af9..5e2e14864d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -172,7 +172,8 @@ class TaskManagerDetailsHandlerTest {
                 ResourceProfile.ZERO,
                 ResourceProfile.ZERO,
                 new HardwareDescription(0, 0L, 0L, 0L),
-                new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L));
+                new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L),
+                false);
     }
 
     private static HandlerRequest<EmptyRequestBody> createRequest() throws HandlerRequestException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index 7231c94070b..140134ee953 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -65,6 +65,7 @@ public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskMan
                         random.nextLong(),
                         random.nextLong(),
                         random.nextLong(),
-                        random.nextLong()));
+                        random.nextLong()),
+                random.nextBoolean());
     }
 }