You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/04 10:43:31 UTC

[GitHub] [flink] wanglijie95 opened a new pull request, #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

wanglijie95 opened a new pull request, #20153:
URL: https://github.com/apache/flink/pull/20153

   ## What is the purpose of the change
   SlotPool should avoid allocating slots that located on blocked nodes. To do that, our core idea is to keep the SlotPool in such a state: there is no slot in SlotPool that is free (no task assigned) and located on blocked nodes. Details are as following:
   
   1. When receiving slot offers from task managers located on blocked nodes, all offers should be rejected.
   2. When a node is newly blocked, we should release all free(no task assigned) slots on it. We need to find all task managers on blocked nodes and release all free slots on them by SlotPoolService#releaseFreeSlotsOnTaskManager.
   3. When a slot state changes from reserved(task assigned) to free(no task assigned), it will check whether the corresponding task manager is blocked. If yes, release the slot.
   
   
   ## Verifying this change
   `BlocklistDeclarativeSlotPoolTest`
   `DeclarativeSlotPoolServiceTest#testReleaseFreeSlotsOnTaskManager`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
     - The serializers: (**no**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**no**)
     - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917229294


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
+                                ExceptionUtils.findThrowable(e, IllegalArgumentException.class)

Review Comment:
   I think this code block can replaced by 
   ```
   Assertions.assertThatThrownBy(() -> unknownPartitionStateFuture.get())
                       .isInstanceOf(ExecutionException.class)
                       .satisfies(FlinkAssertions.anyCauseMatches(IllegalArgumentException.class));
   ```
   `unknownPartitionStateFuture` is a new local variable as follow:
   ```
   CompletableFuture<ExecutionState> unknownPartitionStateFuture =
                       jobMasterGateway.requestPartitionState(
                               partition.getResultId(), new ResultPartitionID());
   ```
   Because lambda expressions require final variables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917595775


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java:
##########
@@ -64,4 +68,23 @@ public interface BlocklistHandler {
      * @param blocklistListener the listener to deregister
      */
     void deregisterBlocklistListener(BlocklistListener blocklistListener);
+
+    /** Factory to instantiate {@link BlocklistHandler}. */

Review Comment:
   I will fix the commit message before merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917217609


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");

Review Comment:
   Let's replace the try-catch block with a `assertThatThrownBy`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -959,20 +951,20 @@ public void testReconnectionAfterDisconnect() throws Exception {
             // wait for first registration attempt
             final JobMasterId firstRegistrationAttempt = registrationsQueue.take();
 
-            assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+            assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId);
 
-            assertThat(registrationsQueue.isEmpty(), is(true));
+            assertThat(registrationsQueue.isEmpty()).isTrue();

Review Comment:
   can be `assertThat(registrationsQueue).isEmpty();`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
+                                ExceptionUtils.findThrowable(e, IllegalArgumentException.class)

Review Comment:
   Would `hasRootCauseInstanceOf(IllegalArgumentException.class)` work here?
   If not, we can still simplify it to be `assertThat(ExceptionUtils.findThrowable(e, IllegalArgumentException.class)).isPresent()`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1462,19 +1455,19 @@ public void testTriggerSavepointTimeout() throws Exception {
 
             try {
                 savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
-                fail();
+                fail("Expected TimeoutException");
             } catch (final ExecutionException e) {
                 final Throwable cause = ExceptionUtils.stripExecutionException(e);
-                assertThat(cause, instanceOf(TimeoutException.class));
+                assertThat(cause).isInstanceOf(TimeoutException.class);
             }
 
-            assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+            assertThat(savepointFutureHighTimeout.isDone()).isFalse();

Review Comment:
   can be `assertThat(savepointFutureHighTimeout).isNotDone();`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1414,9 +1406,10 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, PartitionProducerDisposedException.class)
-                                .isPresent(),
-                        is(true));

Review Comment:
   See above comment.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1398,8 +1389,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(

Review Comment:
   See above 2 comments.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1593,18 +1585,18 @@ public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated(
 
             // we should free the slot, but not disconnect from the TaskExecutor as we still have an
             // allocated partition
-            assertThat(freedSlotFuture.get(), equalTo(allocationId));
+            assertThat(freedSlotFuture.get()).isEqualTo(allocationId);
 
             // trigger some request to guarantee ensure the slotAllocationFailure processing if
             // complete
             jobMasterGateway.requestJobStatus(Time.seconds(5)).get();
-            assertThat(disconnectTaskExecutorFuture.isDone(), is(false));
+            assertThat(disconnectTaskExecutorFuture.isDone()).isFalse();

Review Comment:
   can be `assertThat(disconnectTaskExecutorFuture).isNotDone();`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1462,19 +1455,19 @@ public void testTriggerSavepointTimeout() throws Exception {
 
             try {
                 savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
-                fail();
+                fail("Expected TimeoutException");

Review Comment:
   better to use `assertThatThrownBy`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java:
##########
@@ -230,14 +228,13 @@ public void testCreateAllocatedSlotReport() throws Exception {
                     declarativeSlotPoolService.createAllocatedSlotReport(
                             taskManagerLocation2.getResourceID());
 
-            assertThat(
-                    allocatedSlotReport.getAllocatedSlotInfos(),
-                    contains(matchesWithSlotContext(simpleSlotContext2)));
+            assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+                    .is(matching(contains(matchesWithSlotContext(simpleSlotContext2))));

Review Comment:
   This matcher is simpler so I prefer to drop it and rewrite the verification to be:
   ```
   assertThat(allocatedSlotReport.getAllocatedSlotInfos())
           .allMatches(
                   context ->
                           context.getAllocationId()
                                           .equals(simpleSlotContext2.getAllocationId())
                                   && context.getSlotIndex()
                                           == simpleSlotContext2.getPhysicalSlotNumber());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #20153:
URL: https://github.com/apache/flink/pull/20153#issuecomment-1180043971

   Thanks for review @zhuzhurk @reswqa . I 've addressed all the comments. Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917229662


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
+                                ExceptionUtils.findThrowable(e, IllegalArgumentException.class)

Review Comment:
   Another pattern as follow:
   ```
   Assertions.assertThat(partitionStateFuture)
                       .failsWithin(0, TimeUnit.SECONDS)
                       .withThrowableOfType(ExecutionException.class)
                       .satisfies(FlinkAssertions.anyCauseMatches(IllegalArgumentException.class));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk closed pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.
URL: https://github.com/apache/flink/pull/20153


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917595351


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java:
##########
@@ -230,14 +228,13 @@ public void testCreateAllocatedSlotReport() throws Exception {
                     declarativeSlotPoolService.createAllocatedSlotReport(
                             taskManagerLocation2.getResourceID());
 
-            assertThat(
-                    allocatedSlotReport.getAllocatedSlotInfos(),
-                    contains(matchesWithSlotContext(simpleSlotContext2)));
+            assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+                    .is(matching(contains(matchesWithSlotContext(simpleSlotContext2))));

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917594508


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
+                                ExceptionUtils.findThrowable(e, IllegalArgumentException.class)

Review Comment:
   I think `assertThatThrownBy(xxx).hasRootCauseInstanceOf` is enough.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1398,8 +1389,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(

Review Comment:
   Fixed



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1414,9 +1406,10 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, PartitionProducerDisposedException.class)
-                                .isPresent(),
-                        is(true));

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917816549


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/** This checker helps to query whether a given task manager is blocked. */
+public interface BlockedTaskManagerChecker {
+
+    /**
+     * Returns whether the given task manager is located on blocked nodes.

Review Comment:
   The description of the return is not fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20153:
URL: https://github.com/apache/flink/pull/20153#issuecomment-1173668339

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4538159eed67bf80ec52a9bfc12d28b97009af22",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4538159eed67bf80ec52a9bfc12d28b97009af22",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4538159eed67bf80ec52a9bfc12d28b97009af22 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917593936


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -959,20 +951,20 @@ public void testReconnectionAfterDisconnect() throws Exception {
             // wait for first registration attempt
             final JobMasterId firstRegistrationAttempt = registrationsQueue.take();
 
-            assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+            assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId);
 
-            assertThat(registrationsQueue.isEmpty(), is(true));
+            assertThat(registrationsQueue.isEmpty()).isTrue();

Review Comment:
   Fixed



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917229294


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
+                                ExceptionUtils.findThrowable(e, IllegalArgumentException.class)

Review Comment:
   I think this code block can replaced by 
   ```
   Assertions.assertThatThrownBy(() -> unknownPartitionStateFuture.get())
                       .satisfies(FlinkAssertions.anyCauseMatches(IllegalArgumentException.class));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917219894


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistUtils.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
+
+/** Utility class for blocklist. */
+public class BlocklistUtils {
+
+    public static BlocklistHandler.Factory loadBlocklistHandlerFactory(
+            Configuration configuration) {
+        if (isBlocklistEnabled(configuration)) {
+            return new DefaultBlocklistHandler.Factory(
+                    Time.fromDuration(configuration.get(SlowTaskDetectorOptions.CHECK_INTERVAL)));

Review Comment:
   Let's use `Duration` instead of `Time` in `DefaultBlocklistHandler`. The usage of `Time` is a legacy problem and should be avoided if possible.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java:
##########
@@ -64,4 +68,23 @@ public interface BlocklistHandler {
      * @param blocklistListener the listener to deregister
      */
     void deregisterBlocklistListener(BlocklistListener blocklistListener);
+
+    /** Factory to instantiate {@link BlocklistHandler}. */
+    interface Factory {
+
+        /**
+         * Instantiates an {@link BlocklistHandler}.

Review Comment:
   an -> a



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class BlocklistDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFactory {

Review Comment:
   A java doc is required.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+/** This checker helps to query whether a given task manager is blocked. */
+public interface BlockedTaskManagerChecker {
+
+    /**
+     * Returns whether the given task manager is located on blocked nodes.

Review Comment:
   blocked nodes -> a blocked node



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java:
##########
@@ -64,4 +68,23 @@ public interface BlocklistHandler {
      * @param blocklistListener the listener to deregister
      */
     void deregisterBlocklistListener(BlocklistListener blocklistListener);
+
+    /** Factory to instantiate {@link BlocklistHandler}. */

Review Comment:
   This change should not be a hotfix. If it is needed, let's just mark it as part of FLINK-28144.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.ResourceCounter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.FreeSlotConsumer;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.NewSlotsService;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createResourceRequirements;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.drainNewSlotService;
+import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool;
+import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.HamcrestCondition.matching;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/** Test for {@link BlocklistDeclarativeSlotPool}. */
+class BlocklistDeclarativeSlotPoolTest {
+
+    private static final ResourceProfile RESOURCE_PROFILE =
+            ResourceProfile.newBuilder().setCpuCores(1.7).build();
+
+    @Test
+    void testOfferSlotsFromBlockedTaskManager() throws Exception {
+        testOfferSlots(true);
+    }
+
+    @Test
+    void testOfferSlotsFromUnblockedTaskManager() throws Exception {
+        testOfferSlots(false);
+    }
+
+    private void testOfferSlots(boolean isBlocked) throws Exception {
+        final TaskManagerLocation taskManager = new LocalTaskManagerLocation();
+
+        final NewSlotsService notifyNewSlots = new NewSlotsService();
+        // mark task manager as blocked.
+        final BlocklistDeclarativeSlotPool slotPool =
+                BlocklistDeclarativeSlotPoolBuilder.builder()
+                        .setBlockedTaskManagerChecker(
+                                isBlocked ? taskManager.getResourceID()::equals : ignore -> false)
+                        .build();
+        slotPool.registerNewSlotsListener(notifyNewSlots);
+
+        final ResourceCounter resourceRequirements = createResourceRequirements();
+        slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+        // offer slots on the blocked task manager
+        Collection<SlotOffer> slotOffers =
+                createSlotOffersForResourceRequirements(resourceRequirements);
+
+        if (isBlocked) {
+            assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, taskManager)).isEmpty();
+            assertThat(drainNewSlotService(notifyNewSlots)).isEmpty();
+        } else {
+            assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, taskManager))
+                    .containsExactlyInAnyOrderElementsOf(slotOffers);
+            assertThat(drainNewSlotService(notifyNewSlots))
+                    .is(
+                            matching(

Review Comment:
   I would avoid use Hamcrest in new tests. 
   One possible way to do the verification is to build a `Map<AllocationID, SlotOffer> slotOfferMap` from `slotOffers`, and introduce a method `boolean matchSlotToOffers(PhysicalSlot, SlotOffer)` and the assertion can be:
   ```
   assertThat(drainNewSlotService(notifyNewSlots)).allMatch(slot -> matchSlotToOffers(slot, slotOfferMap.remove(slot.getAllocationId()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917229294


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1384,8 +1374,9 @@ public void testRequestPartitionState() throws Exception {
                 fail("Expected failure.");
             } catch (ExecutionException e) {
                 assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
+                                ExceptionUtils.findThrowable(e, IllegalArgumentException.class)

Review Comment:
   I think this code block can replaced by 
   ```
   Assertions.assertThatThrownBy(() -> unknownPartitionStateFuture.get())
                       .isInstanceOf(ExecutionException.class)
                       .satisfies(FlinkAssertions.anyCauseMatches(IllegalArgumentException.class));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #20153:
URL: https://github.com/apache/flink/pull/20153#issuecomment-1180312542

   Thanks for review @zhuzhurk ! I have addressed the last comment and squashed the commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917245184


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -1449,4 +1491,42 @@ public Void retrievePayload(ResourceID resourceID) {
             return null;
         }
     }
+
+    private class JobMasterBlocklistContext implements BlocklistContext {
+
+        @Override
+        public void blockResources(Collection<BlockedNode> blockedNodes) {
+            Map<String, Set<ResourceID>> taskManagersByNode =
+                    taskManagerToNode.entrySet().stream()
+                            .collect(
+                                    Collectors.groupingBy(
+                                            Map.Entry::getValue,
+                                            Collectors.mapping(
+                                                    Map.Entry::getKey, Collectors.toSet())));
+
+            Collection<ResourceID> blockedTaskMangers =

Review Comment:
   Given that it is now iterating over all the registered taskmanagers. We can do it in a simpler and possibly more performant way, by checking all the taskmanagers to see if it is on the blocked nodes (creating a set for blocked nodes first).
   Or we can maintain the `taskManagersByNode` in JobMaster to make the process even more performant.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -234,6 +235,31 @@ public boolean releaseTaskManager(ResourceID taskManagerId, Exception cause) {
         return false;
     }
 
+    @Override
+    public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+        assertHasBeenStarted();
+        if (isTaskManagerRegistered(taskManagerId)) {
+
+            Collection<AllocationID> freeSlots =
+                    declarativeSlotPool.getFreeSlotsInformation().stream()
+                            .filter(
+                                    slotInfo ->
+                                            slotInfo.getTaskManagerLocation()
+                                                    .getResourceID()
+                                                    .equals(taskManagerId))
+                            .map(SlotInfoWithUtilization::getAllocationId)
+                            .collect(Collectors.toSet());
+
+            for (AllocationID allocationID : freeSlots) {

Review Comment:
   allocationID -> allocationId



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -208,6 +217,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
 
     private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
 
+    private final Map<ResourceID, String> taskManagerToNode;

Review Comment:
   This is not necessarily needed. We can have a method instead
   ```
   private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
       return registeredTaskManagers.get(taskManagerId).getTaskManagerLocation().getNodeId();
   }
   ```
   Or even just create the function when `blocklistHandlerFactory.create(...)` is invoked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20153: [FLINK-28144][runtime] Let JobMaster support blocklist.

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20153:
URL: https://github.com/apache/flink/pull/20153#discussion_r917218919


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java:
##########
@@ -230,14 +228,13 @@ public void testCreateAllocatedSlotReport() throws Exception {
                     declarativeSlotPoolService.createAllocatedSlotReport(
                             taskManagerLocation2.getResourceID());
 
-            assertThat(
-                    allocatedSlotReport.getAllocatedSlotInfos(),
-                    contains(matchesWithSlotContext(simpleSlotContext2)));
+            assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+                    .is(matching(contains(matchesWithSlotContext(simpleSlotContext2))));

Review Comment:
   This matcher is simpler so I prefer to drop it and rewrite the verification to be:
   ```
   assertThat(allocatedSlotReport.getAllocatedSlotInfos())
           .allMatch(
                   context ->
                           context.getAllocationId()
                                           .equals(simpleSlotContext2.getAllocationId())
                                   && context.getSlotIndex()
                                           == simpleSlotContext2.getPhysicalSlotNumber());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org