You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/01 07:02:35 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #20079: [FLINK-28143][runtime] Introduce BlocklistHandler

zhuzhurk commented on code in PR #20079:
URL: https://github.com/apache/flink/pull/20079#discussion_r911668395


##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   you can use 
   ```
       @RegisterExtension
       private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
               TestingUtils.defaultExecutorExtension();
   ```
   and the executor can be `EXECUTOR_RESOURCE.getExecutor()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        removedNodes.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+            blocklistContext.unBlockResources(removedNodes);
+        }
+    }
+
+    private void assertRunningInMainThread() {
+        mainThreadExecutor.assertRunningInMainThread();
+    }
+
+    @Override
+    public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        assertRunningInMainThread();
+
+        Collection<BlockedNode> newlyAddedOrMerged = blocklistTracker.addNewBlockedNodes(newNodes);
+        if (!newlyAddedOrMerged.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Newly added/merged blocked nodes: {}, details: {}."

Review Comment:
   Newly added/merged blocked nodes: {} -> Newly added/merged {} blocked nodes
   
   I feel the it was a bit misleading that the number is the id of the node or the count of the node.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes

Review Comment:
   the blocked nodes -> the nodes to block resources



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the unBlocked nodes

Review Comment:
   the unBlocked nodes -> the nodes to unblock resources



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * This class is responsible for managing all {@link BlockedNode}s and performing them on resources.
+ */
+public interface BlocklistHandler {
+
+    /**
+     * Block a node.
+     *
+     * @param nodeId ID of the node
+     * @param cause the cause for blocking the node
+     * @param endTimestamp the timestamp at which the node should be unblocked
+     */
+    default void blockNode(String nodeId, String cause, long endTimestamp) {

Review Comment:
   Maybe remove this method because it will not be used in near future? The speculative scheduler may need to block nodes in a bulk way and therefore will use `addNewBlockedNodes`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."

Review Comment:
   Timeout removed blocked nodes: {}. -> Remove {} timeout blocked nodes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistTracker.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistTracker}. */
+public class DefaultBlocklistTracker implements BlocklistTracker {
+    private final Map<String, BlockedNode> blockedNodes = new HashMap<>();
+
+    /**
+     * Try to add a new blocked node record. If the node (identified by node id) already exists, the
+     * newly added one will be merged with the existing one.
+     *
+     * @param newNode the new blocked node record
+     * @return the changed record, or {@link Optional#empty()} if no change
+     */
+    private Optional<BlockedNode> tryAddOrMerge(BlockedNode newNode) {
+        checkNotNull(newNode);
+        final String nodeId = newNode.getNodeId();
+        final BlockedNode existingNode = blockedNodes.get(nodeId);
+
+        if (existingNode == null) {
+            blockedNodes.put(nodeId, newNode);
+            return Optional.of(newNode);
+        } else if (!existingNode.equals(newNode)) {
+            BlockedNode merged = BlockedNode.merge(existingNode, newNode);

Review Comment:
   If the merged node equals to the existing node, it should not be recognize it as changed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {

Review Comment:
   can be package private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the blocked nodes
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the unBlocked nodes
+     */
+    void unBlockResources(Collection<BlockedNode> unBlockedNodes);

Review Comment:
   unBlockResources -> unblockResources



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   not needed



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "

Review Comment:
   Timeout removed blocked nodes: {}, details {}. -> Remove {} timeout blocked nodes, details: {}.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Map<ResourceID, String> taskManagerToNode,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerToNode = checkNotNull(taskManagerToNode);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Timeout removed blocked nodes: {}, details {}. "
+                                + "Total blocked nodes currently: {}, details: {}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Timeout removed blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        removedNodes.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+            blocklistContext.unBlockResources(removedNodes);
+        }
+    }
+
+    private void assertRunningInMainThread() {
+        mainThreadExecutor.assertRunningInMainThread();
+    }
+
+    @Override
+    public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        assertRunningInMainThread();
+
+        Collection<BlockedNode> newlyAddedOrMerged = blocklistTracker.addNewBlockedNodes(newNodes);
+        if (!newlyAddedOrMerged.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Newly added/merged blocked nodes: {}, details: {}."
+                                + " Total blocked nodes currently: {}, details: {}.",
+                        newlyAddedOrMerged.size(),
+                        newlyAddedOrMerged,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Newly added/merged blocked nodes: {}."
+                                + " Total blocked nodes currently: {}.",
+                        newlyAddedOrMerged.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+
+            blocklistListeners.forEach(
+                    listener -> listener.notifyNewBlockedNodes(newlyAddedOrMerged));
+            blocklistContext.blockResources(newlyAddedOrMerged);
+        }
+    }
+
+    @Override
+    public boolean isBlockedTaskManager(ResourceID taskManagerId) {
+        assertRunningInMainThread();
+
+        return taskManagerToNode.containsKey(taskManagerId)

Review Comment:
   I would suggest `checkState(taskManagerToNode.containsKey(taskManagerId))` because we never expect it fails to retrieve the node id of a task manager.
   
   Or is there any racing case that the `taskManagerToNode` is cleaned but the `isBlockedTaskManager()` is invoked after that?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =
+                new TestingComponentMainThreadExecutor(
+                        ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                                scheduledExecutorService));
+    }
+
+    @AfterEach
+    public void after() {

Review Comment:
   can be package private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
+
+    private final Logger log;
+
+    private final Map<ResourceID, String> taskManagerToNode;

Review Comment:
   I prefer it to be `Function<ResourceID, String> taskManagerNodeIdRetriever; `
   Because the map is actually changing outside. And it is misleading if we pass in a changing map.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private TestingComponentMainThreadExecutor mainThreadExecutor;
+
+    @BeforeEach
+    public void before() {
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =

Review Comment:
   By using `ComponentMainThreadExecutorServiceAdapter.forMainThread()` being the `mainThreadExecutor`, the test main thread will be treated as the main thread. Then it is not need to `mainThreadExecutor.execute(...`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistTracker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** A tracker for blocklist. */
+public interface BlocklistTracker {
+
+    /**
+     * Add new blocked node records. If a node (identified by node id) already exists, the newly
+     * added one will be merged with the existing one.
+     *
+     * @param newNodes the new blocked node records
+     * @return the changes of blocklist after execution, which will be synchronized to other JMs/RM
+     */
+    Collection<BlockedNode> addNewBlockedNodes(Collection<BlockedNode> newNodes);
+
+    /**
+     * Returns whether the given node is blocked.
+     *
+     * @param nodeId ID of the node to query
+     * @return true if the given node is blocked, otherwise false
+     */
+    boolean isBlockedNode(String nodeId);
+
+    /**
+     * Get all blocked node ids.
+     *
+     * @return a collection containing all blocked node ids
+     */
+    Collection<String> getAllBlockedNodeIds();

Review Comment:
   Collection<String> -> Set<String>
   
   From the latter PoC, I see that the result will be treated as a `Set`. (e.g. used to check if it equals to another set)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {

Review Comment:
   A `serialVersionUID` should be added. See https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This class represents a blocked node record. */
+public class BlockedNode implements Serializable {
+
+    private final String nodeId;
+
+    private final Set<String> causes = new HashSet<>();
+
+    private final long endTimestamp;
+
+    BlockedNode(String nodeId, String cause, long endTimestamp) {
+        this(nodeId, Collections.singleton(cause), endTimestamp);
+    }
+
+    BlockedNode(String nodeId, Collection<String> causes, long endTimestamp) {
+        this.nodeId = checkNotNull(nodeId);
+        this.causes.addAll(checkNotNull(causes));
+        this.endTimestamp = endTimestamp;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public Collection<String> getCauses() {
+        return causes;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public static BlockedNode merge(BlockedNode node, BlockedNode other) {
+        checkNotNull(node);
+        checkNotNull(other);
+        checkState(node.nodeId.equals(other.nodeId));
+
+        Set<String> mergedCauses = new HashSet<>();
+        mergedCauses.addAll(node.causes);
+        mergedCauses.addAll(other.causes);
+
+        long mergedEndTimestamp = Math.max(node.endTimestamp, other.endTimestamp);

Review Comment:
   How about just to keep the latest cause (the cause with the largest endTimestamp)?
   Keeping a lot of historical causes is not good and can also be misleading in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org