You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/27 07:47:02 UTC

[GitHub] [flink] wanglijie95 opened a new pull request, #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

wanglijie95 opened a new pull request, #20079:
URL: https://github.com/apache/flink/pull/20079

   ## What is the purpose of the change
   
   Introduce BlocklistHandler
   
   ## Verifying this change
   
   Unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
     - The serializers: (**no**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**no**)
     - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk closed pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler
URL: https://github.com/apache/flink/pull/20079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911836101


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        removedNodes.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+            blocklistContext.unBlockResources(removedNodes);
+        }
+    }
+
+    private void assertRunningInMainThread() {
+        mainThreadExecutor.assertRunningInMainThread();
+    }
+
+    @Override
+    public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        assertRunningInMainThread();
+
+        Collection<BlockedNode> newlyAddedOrMerged = blocklistTracker.addNewBlockedNodes(newNodes);
+        if (!newlyAddedOrMerged.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Newly added/merged blocked nodes: {}, details: {}."
+                                + " Total blocked nodes currently: {}, details: {}.",
+                        newlyAddedOrMerged.size(),
+                        newlyAddedOrMerged,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Newly added/merged blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        newlyAddedOrMerged.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+
+            blocklistListeners.forEach(
+                    listener -> listener.notifyNewBlockedNodes(newlyAddedOrMerged));
+            blocklistContext.blockResources(newlyAddedOrMerged);
+        }
+    }
+
+    @Override
+    public boolean isBlockedTaskManager(ResourceID taskManagerId) {
+        assertRunningInMainThread();
+
+        return taskManagerToNode.containsKey(taskManagerId)

Review Comment:
   You are right, we never expect it fails to retrieve the node id of a task manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911668395


##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   you can use 
   ```
       @RegisterExtension
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
               TestingUtils.defaultExecutorExtension();
   ```
   and the executor can be `EXECUTOR_RESOURCE.getExecutor()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        removedNodes.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+            blocklistContext.unBlockResources(removedNodes);
+        }
+    }
+
+    private void assertRunningInMainThread() {
+        mainThreadExecutor.assertRunningInMainThread();
+    }
+
+    @Override
+    public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        assertRunningInMainThread();
+
+        Collection<BlockedNode> newlyAddedOrMerged = blocklistTracker.addNewBlockedNodes(newNodes);
+        if (!newlyAddedOrMerged.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Newly added/merged blocked nodes: {}, details: {}."

Review Comment:
   Newly added/merged blocked nodes: {} -> Newly added/merged {} blocked nodes
   
   I feel the it was a bit misleading that the number is the id of the node or the count of the node.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes

Review Comment:
   the blocked nodes -> the nodes to block resources



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the unBlocked nodes

Review Comment:
   the unBlocked nodes -> the nodes to unblock resources



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * This class is responsible for managing all {@link BlockedNode}s and performing them on resources.
+ */
+public interface BlocklistHandler {
+
+    /**
+     * Block a node.
+     *
+     * @param nodeId ID of the node
+     * @param cause the cause for blocking the node
+     * @param endTimestamp the timestamp at which the node should be unblocked
+     */
+    default void blockNode(String nodeId, String cause, long endTimestamp) {

Review Comment:
   Maybe remove this method because it will not be used in near future? The speculative scheduler may need to block nodes in a bulk way and therefore will use `addNewBlockedNodes`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."

Review Comment:
   Timeout removed blocked nodes: {}. -> Remove {} timeout blocked nodes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistTracker.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistTracker}. */
+public class DefaultBlocklistTracker implements BlocklistTracker {
+    private final Map<String, BlockedNode> blockedNodes = new HashMap<>();
+
+    /**
+     * Try to add a new blocked node record. If the node (identified by node id) already exists, the
+     * newly added one will be merged with the existing one.
+     *
+     * @param newNode the new blocked node record
+     * @return the changed record, or {@link Optional#empty()} if no change
+     */
+    private Optional<BlockedNode> tryAddOrMerge(BlockedNode newNode) {
+        checkNotNull(newNode);
+        final String nodeId = newNode.getNodeId();
+        final BlockedNode existingNode = blockedNodes.get(nodeId);
+
+        if (existingNode == null) {
+            blockedNodes.put(nodeId, newNode);
+            return Optional.of(newNode);
+        } else if (!existingNode.equals(newNode)) {
+            BlockedNode merged = BlockedNode.merge(existingNode, newNode);

Review Comment:
   If the merged node equals to the existing node, it should not be recognize it as changed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {

Review Comment:
   can be package private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the unBlocked nodes
+     */
+    void unBlockResources(Collection<BlockedNode> unBlockedNodes);

Review Comment:
   unBlockResources -> unblockResources



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   not needed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "

Review Comment:
   Timeout removed blocked nodes: {}, details {}. -> Remove {} timeout blocked nodes, details: {}.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        removedNodes.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+            blocklistContext.unBlockResources(removedNodes);
+        }
+    }
+
+    private void assertRunningInMainThread() {
+        mainThreadExecutor.assertRunningInMainThread();
+    }
+
+    @Override
+    public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        assertRunningInMainThread();
+
+        Collection<BlockedNode> newlyAddedOrMerged = blocklistTracker.addNewBlockedNodes(newNodes);
+        if (!newlyAddedOrMerged.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Newly added/merged blocked nodes: {}, details: {}."
+                                + " Total blocked nodes currently: {}, details: {}.",
+                        newlyAddedOrMerged.size(),
+                        newlyAddedOrMerged,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Newly added/merged blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        newlyAddedOrMerged.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+
+            blocklistListeners.forEach(
+                    listener -> listener.notifyNewBlockedNodes(newlyAddedOrMerged));
+            blocklistContext.blockResources(newlyAddedOrMerged);
+        }
+    }
+
+    @Override
+    public boolean isBlockedTaskManager(ResourceID taskManagerId) {
+        assertRunningInMainThread();
+
+        return taskManagerToNode.containsKey(taskManagerId)

Review Comment:
   I would suggest `checkState(taskManagerToNode.containsKey(taskManagerId))` because we never expect it fails to retrieve the node id of a task manager.
   
   Or is there any racing case that the `taskManagerToNode` is cleaned but the `isBlockedTaskManager()` is invoked after that?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =
+                new TestingComponentMainThreadExecutor(
+                        ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                                scheduledExecutorService));
+    }
+
+    @AfterEach
+    public void after() {

Review Comment:
   can be package private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;

Review Comment:
   I prefer it to be `Function<ResourceID, String> taskManagerNodeIdRetriever; `
   Because the map is actually changing outside. And it is misleading if we pass in a changing map.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =

Review Comment:
   By using `ComponentMainThreadExecutorServiceAdapter.forMainThread()` being the `mainThreadExecutor`, the test main thread will be treated as the main thread. Then it is not need to `mainThreadExecutor.execute(...`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistTracker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** A tracker for blocklist. */
+public interface BlocklistTracker {
+
+    /**
+     * Add new blocked node records. If a node (identified by node id) already exists, the newly
+     * added one will be merged with the existing one.
+     *
+     * @param newNodes the new blocked node records
+     * @return the changes of blocklist after execution, which will be synchronized to other JMs/RM
+     */
+    Collection<BlockedNode> addNewBlockedNodes(Collection<BlockedNode> newNodes);
+
+    /**
+     * Returns whether the given node is blocked.
+     *
+     * @param nodeId ID of the node to query
+     * @return true if the given node is blocked, otherwise false
+     */
+    boolean isBlockedNode(String nodeId);
+
+    /**
+     * Get all blocked node ids.
+     *
+     * @return a collection containing all blocked node ids
+     */
+    Collection<String> getAllBlockedNodeIds();

Review Comment:
   Collection<String> -> Set<String>
   
   From the latter PoC, I see that the result will be treated as a `Set`. (e.g. used to check if it equals to another set)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {

Review Comment:
   A `serialVersionUID` should be added. See https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {
+
+    private final String nodeId;
+
+    private final Set<String> causes = new HashSet<>();
+
+    private final long endTimestamp;
+
+    BlockedNode(String nodeId, String cause, long endTimestamp) {
+        this(nodeId, Collections.singleton(cause), endTimestamp);
+    }
+
+    BlockedNode(String nodeId, Collection<String> causes, long endTimestamp) {
+        this.nodeId = checkNotNull(nodeId);
+        this.causes.addAll(checkNotNull(causes));
+        this.endTimestamp = endTimestamp;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Collection<String> getCauses() {
+        return causes;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public static BlockedNode merge(BlockedNode node, BlockedNode other) {
+        checkNotNull(node);
+        checkNotNull(other);
+        checkState(node.nodeId.equals(other.nodeId));
+
+        Set<String> mergedCauses = new HashSet<>();
+        mergedCauses.addAll(node.causes);
+        mergedCauses.addAll(other.causes);
+
+        long mergedEndTimestamp = Math.max(node.endTimestamp, other.endTimestamp);

Review Comment:
   How about just to keep the latest cause (the cause with the largest endTimestamp)?
   Keeping a lot of historical causes is not good and can also be misleading in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911835345


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the unBlocked nodes
+     */
+    void unBlockResources(Collection<BlockedNode> unBlockedNodes);

Review Comment:
   OK



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes

Review Comment:
   OK



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the unBlocked nodes

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911834985


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {
+
+    private final String nodeId;
+
+    private final Set<String> causes = new HashSet<>();
+
+    private final long endTimestamp;
+
+    BlockedNode(String nodeId, String cause, long endTimestamp) {
+        this(nodeId, Collections.singleton(cause), endTimestamp);
+    }
+
+    BlockedNode(String nodeId, Collection<String> causes, long endTimestamp) {
+        this.nodeId = checkNotNull(nodeId);
+        this.causes.addAll(checkNotNull(causes));
+        this.endTimestamp = endTimestamp;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Collection<String> getCauses() {
+        return causes;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public static BlockedNode merge(BlockedNode node, BlockedNode other) {
+        checkNotNull(node);
+        checkNotNull(other);
+        checkState(node.nodeId.equals(other.nodeId));
+
+        Set<String> mergedCauses = new HashSet<>();
+        mergedCauses.addAll(node.causes);
+        mergedCauses.addAll(other.causes);
+
+        long mergedEndTimestamp = Math.max(node.endTimestamp, other.endTimestamp);

Review Comment:
   Make sense. Assume the existing one is A, new one is B, A.endTimestamp == B.endTimestamp and A.cause != B.cause. Then I think the merged one should be [B.endTimestamp, B.cause] . WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911835511


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * This class is responsible for managing all {@link BlockedNode}s and performing them on resources.
+ */
+public interface BlocklistHandler {
+
+    /**
+     * Block a node.
+     *
+     * @param nodeId ID of the node
+     * @param cause the cause for blocking the node
+     * @param endTimestamp the timestamp at which the node should be unblocked
+     */
+    default void blockNode(String nodeId, String cause, long endTimestamp) {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #20079:
URL: https://github.com/apache/flink/pull/20079#issuecomment-1172195743

   Thanks for review @zhuzhurk. I 've addressed all comments, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20079:
URL: https://github.com/apache/flink/pull/20079#issuecomment-1167007602

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4e14674d225f6ec980382041d6b4ddde5df3121a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4e14674d225f6ec980382041d6b4ddde5df3121a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4e14674d225f6ec980382041d6b4ddde5df3121a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911968694


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {
+
+    private final String nodeId;
+
+    private final Set<String> causes = new HashSet<>();
+
+    private final long endTimestamp;
+
+    BlockedNode(String nodeId, String cause, long endTimestamp) {
+        this(nodeId, Collections.singleton(cause), endTimestamp);
+    }
+
+    BlockedNode(String nodeId, Collection<String> causes, long endTimestamp) {
+        this.nodeId = checkNotNull(nodeId);
+        this.causes.addAll(checkNotNull(causes));
+        this.endTimestamp = endTimestamp;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Collection<String> getCauses() {
+        return causes;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public static BlockedNode merge(BlockedNode node, BlockedNode other) {
+        checkNotNull(node);
+        checkNotNull(other);
+        checkState(node.nodeId.equals(other.nodeId));
+
+        Set<String> mergedCauses = new HashSet<>();
+        mergedCauses.addAll(node.causes);
+        mergedCauses.addAll(other.causes);
+
+        long mergedEndTimestamp = Math.max(node.endTimestamp, other.endTimestamp);

Review Comment:
   Looks good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911835119


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {

Review Comment:
   OK



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistTracker.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistTracker}. */
+public class DefaultBlocklistTracker implements BlocklistTracker {
+    private final Map<String, BlockedNode> blockedNodes = new HashMap<>();
+
+    /**
+     * Try to add a new blocked node record. If the node (identified by node id) already exists, the
+     * newly added one will be merged with the existing one.
+     *
+     * @param newNode the new blocked node record
+     * @return the changed record, or {@link Optional#empty()} if no change
+     */
+    private Optional<BlockedNode> tryAddOrMerge(BlockedNode newNode) {
+        checkNotNull(newNode);
+        final String nodeId = newNode.getNodeId();
+        final BlockedNode existingNode = blockedNodes.get(nodeId);
+
+        if (existingNode == null) {
+            blockedNodes.put(nodeId, newNode);
+            return Optional.of(newNode);
+        } else if (!existingNode.equals(newNode)) {
+            BlockedNode merged = BlockedNode.merge(existingNode, newNode);

Review Comment:
   OK



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistTracker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** A tracker for blocklist. */
+public interface BlocklistTracker {
+
+    /**
+     * Add new blocked node records. If a node (identified by node id) already exists, the newly
+     * added one will be merged with the existing one.
+     *
+     * @param newNodes the new blocked node records
+     * @return the changes of blocklist after execution, which will be synchronized to other JMs/RM
+     */
+    Collection<BlockedNode> addNewBlockedNodes(Collection<BlockedNode> newNodes);
+
+    /**
+     * Returns whether the given node is blocked.
+     *
+     * @param nodeId ID of the node to query
+     * @return true if the given node is blocked, otherwise false
+     */
+    boolean isBlockedNode(String nodeId);
+
+    /**
+     * Get all blocked node ids.
+     *
+     * @return a collection containing all blocked node ids
+     */
+    Collection<String> getAllBlockedNodeIds();

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org