You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/06 12:39:51 UTC

[GitHub] [flink] tillrohrmann opened a new pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

tillrohrmann opened a new pull request #18286:
URL: https://github.com/apache/flink/pull/18286


   ## What is the purpose of the change
   
   This commit forwards the preferred allocations into the DeclarativeSlotPoolBridge so that
   new slots can be matched against the preferred allocations.
   
   Moreover, the commit introduces a RequestSlotMatchingStrategy that is used by the DeclarativeSlotPoolBridge
   to match slots to pending requests. If local recovery is enabled, then Flink will use the
   PreferredAllocationRequestSlotMatchingStrategy that first tries to match pending requests to slots based on
   their preferred allocation and then falls back to SimpleRequestSlotMatchingStrategy that matches solely on the
   ResourceProfile basis.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r782893415



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -162,9 +163,30 @@ void start(
      * @return a newly allocated slot that was previously not available.
      */
     @Nonnull
+    default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+            @Nonnull SlotRequestId slotRequestId,
+            @Nonnull ResourceProfile resourceProfile,
+            @Nullable Time timeout) {
+        return requestNewAllocatedSlot(
+                slotRequestId, resourceProfile, Collections.emptyList(), timeout);
+    }
+
+    /**
+     * Request the allocation of a new slot from the resource manager. This method will not return a
+     * slot from the already available slots from the pool, but instead will add a new slot to that
+     * pool that is immediately allocated and returned.
+     *
+     * @param slotRequestId identifying the requested slot
+     * @param resourceProfile resource profile that specifies the resource requirements for the
+     *     requested slot
+     * @param preferredAllocations preferred allocations for the new allocated slot
+     * @param timeout timeout for the allocation procedure
+     * @return a newly allocated slot that was previously not available.
+     */
     CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
             @Nonnull SlotRequestId slotRequestId,
             @Nonnull ResourceProfile resourceProfile,
+            @Nonnull Collection<AllocationID> preferredAllocations,

Review comment:
       I would leave this for a follow-up task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r783181844



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -55,13 +59,26 @@
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
+@RunWith(Parameterized.class)
 public class DeclarativeSlotPoolBridgeTest extends TestLogger {
 
     private static final Time rpcTimeout = Time.seconds(20);
     private static final JobID jobId = new JobID();
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
             ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+
+    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
+    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
+        return Arrays.asList(
+                SimpleRequestSlotMatchingStrategy.INSTANCE,
+                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+    }
+
+    public DeclarativeSlotPoolBridgeTest(RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;

Review comment:
       I am not sure whether it adds all that much additional benefit since we already have `PreferredAllocationRequestSlotMatchingStrategyTest` that tests the strategy. So what we are effectively testing is that the strategy is called. But maybe it is not too hard to add a test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006558939


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 04df41b87646cb470605c2b300f68293f49bacc7 (Thu Jan 06 12:45:24 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333",
       "triggerID" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29472",
       "triggerID" : "93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d1f9367d455c13cb079f03759cf9ec8d8e280f1e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333) 
   * 93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29472) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04df41b87646cb470605c2b300f68293f49bacc7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044) 
   * d1f9367d455c13cb079f03759cf9ec8d8e280f1e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333",
       "triggerID" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29472",
       "triggerID" : "93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29472) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r782896288



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import java.util.Collection;
+
+/** Strategy to match slot requests to slots. */
+public interface RequestSlotMatchingStrategy {

Review comment:
       At the moment this is not possible because we are using this interface in the parent package (for setting up things). We could refactor things but I am not sure whether it is worth the effort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r782896288



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import java.util.Collection;
+
+/** Strategy to match slot requests to slots. */
+public interface RequestSlotMatchingStrategy {

Review comment:
       Yes, this makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r783157442



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
##########
@@ -184,6 +189,25 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
                 slotPoolServiceFactory, schedulerNGFactory);
     }
 
+    private static RequestSlotMatchingStrategy getRequestSlotMatchingStrategy(

Review comment:
       I think this is a good idea. I will add a test for this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r782905849



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -102,12 +103,16 @@ public SlotProfile getSlotProfile(
                         preferredLocationsRetriever.getPreferredLocations(
                                 execution, producersToIgnore));
             }
-            return SlotProfile.priorAllocation(
-                    physicalSlotResourceProfile,
-                    physicalSlotResourceProfile,
-                    preferredLocations,
-                    priorAllocations,
-                    reservedAllocationIds);
+
+            final SlotProfile slotProfile =

Review comment:
       I think this is a mistake. I will remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r784874058



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -55,13 +59,26 @@
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
+@RunWith(Parameterized.class)
 public class DeclarativeSlotPoolBridgeTest extends TestLogger {
 
     private static final Time rpcTimeout = Time.seconds(20);
     private static final JobID jobId = new JobID();
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
             ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+
+    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
+    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
+        return Arrays.asList(
+                SimpleRequestSlotMatchingStrategy.INSTANCE,
+                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+    }
+
+    public DeclarativeSlotPoolBridgeTest(RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;

Review comment:
       You're right, I've missed that when this test couldn't pass without the `RequestSlotMatchingStrategy` being called. 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333",
       "triggerID" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d1f9367d455c13cb079f03759cf9ec8d8e280f1e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1013323221


   Thanks a lot for the review @dmvk. Rebasing this PR and merging once AZP gives green light.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04df41b87646cb470605c2b300f68293f49bacc7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r781301276



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import java.util.Collection;
+
+/** Strategy to match slot requests to slots. */
+public interface RequestSlotMatchingStrategy {

Review comment:
       ```suggestion
   interface RequestSlotMatchingStrategy {
   ```
   
   Should this interface be package private as `PendingRequest` and methods of the `RequestSlotMatching` class are as well?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -177,9 +199,18 @@ void start(
      *     requested batch slot
      * @return a future which is completed with newly allocated batch slot
      */
+    @Nonnull
+    default CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
+            @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {

Review comment:
       should we get rid of `@Nonnull` annotations here? We should be implicitly treating everything as non-null and only annotate `@Nullable` parameters to reduce noise.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
##########
@@ -45,15 +50,29 @@
 import static org.junit.Assert.assertThat;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
+@RunWith(Parameterized.class)
 public class DeclarativeSlotPoolBridgeResourceDeclarationTest extends TestLogger {
 
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
             ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
 
     private RequirementListener requirementListener;
     private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;
 
+    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
+    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {

Review comment:
       👍 

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
##########
@@ -55,13 +59,26 @@
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
+@RunWith(Parameterized.class)
 public class DeclarativeSlotPoolBridgeTest extends TestLogger {
 
     private static final Time rpcTimeout = Time.seconds(20);
     private static final JobID jobId = new JobID();
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
             ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+
+    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
+    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
+        return Arrays.asList(
+                SimpleRequestSlotMatchingStrategy.INSTANCE,
+                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+    }
+
+    public DeclarativeSlotPoolBridgeTest(RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;

Review comment:
       This tests that the different slot matching strategy doesn't break anything. Should we also test that it's really taken into account? (same with the `DeclarativeSlotPoolBridgeResourceDeclarationTest.java`)

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+
+/**
+ * Simple implementation of the {@link RequestSlotMatchingStrategy} that matches the pending
+ * requests in order as long as the resource profile can be fulfilled.
+ */
+public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatching> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatching> resultingMatchings = new ArrayList<>();
+
+        // if pendingRequests has a special order, then let's preserve it
+        final LinkedHashMap<SlotRequestId, PendingRequest> pendingRequestsIndex =

Review comment:
       Why do we need an index here? Are we expecting duplicates? Can we simply replace this with a LinkedList?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+
+/**
+ * Simple implementation of the {@link RequestSlotMatchingStrategy} that matches the pending
+ * requests in order as long as the resource profile can be fulfilled.
+ */
+public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatching> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatching> resultingMatchings = new ArrayList<>();

Review comment:
       nit:
   ```suggestion
           final Collection<RequestSlotMatching> resultingMatches = new ArrayList<>();
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import java.util.Collection;
+
+/** Strategy to match slot requests to slots. */
+public interface RequestSlotMatchingStrategy {
+
+    /**
+     * Match the given slots with the given collection of pending requests.
+     *
+     * @param slots slots to match
+     * @param pendingRequests slot requests to match
+     * @return resulting matchings of this operation
+     */
+    Collection<RequestSlotMatching> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests);
+
+    /** Result class representing matchings. */
+    final class RequestSlotMatching {

Review comment:
       ```suggestion
       final class RequestSlotMatch {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -162,9 +163,30 @@ void start(
      * @return a newly allocated slot that was previously not available.
      */
     @Nonnull
+    default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+            @Nonnull SlotRequestId slotRequestId,
+            @Nonnull ResourceProfile resourceProfile,
+            @Nullable Time timeout) {
+        return requestNewAllocatedSlot(
+                slotRequestId, resourceProfile, Collections.emptyList(), timeout);
+    }
+
+    /**
+     * Request the allocation of a new slot from the resource manager. This method will not return a
+     * slot from the already available slots from the pool, but instead will add a new slot to that
+     * pool that is immediately allocated and returned.
+     *
+     * @param slotRequestId identifying the requested slot
+     * @param resourceProfile resource profile that specifies the resource requirements for the
+     *     requested slot
+     * @param preferredAllocations preferred allocations for the new allocated slot
+     * @param timeout timeout for the allocation procedure
+     * @return a newly allocated slot that was previously not available.
+     */
     CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
             @Nonnull SlotRequestId slotRequestId,
             @Nonnull ResourceProfile resourceProfile,
+            @Nonnull Collection<AllocationID> preferredAllocations,

Review comment:
       I'm wondering whether we should also make use of preferred locations as they can potentially address the data locality issue as well 🤔 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+final class PendingRequest {
+
+    private final SlotRequestId slotRequestId;
+
+    private final ResourceProfile resourceProfile;
+
+    private final HashSet<AllocationID> preferredAllocations;
+
+    private final CompletableFuture<PhysicalSlot> slotFuture;
+
+    private final boolean isBatchRequest;
+
+    private long unfulfillableSince;
+
+    private PendingRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations,
+            boolean isBatchRequest) {
+        this.slotRequestId = slotRequestId;
+        this.resourceProfile = resourceProfile;
+        this.preferredAllocations = new HashSet<>(preferredAllocations);
+        this.isBatchRequest = isBatchRequest;
+        this.slotFuture = new CompletableFuture<>();
+        this.unfulfillableSince = Long.MAX_VALUE;
+    }
+
+    static PendingRequest createBatchRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, true);
+    }
+
+    static PendingRequest createNormalRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, false);
+    }
+
+    SlotRequestId getSlotRequestId() {
+        return slotRequestId;
+    }
+
+    ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    public Set<AllocationID> getPreferredAllocations() {
+        return preferredAllocations;
+    }
+
+    CompletableFuture<PhysicalSlot> getSlotFuture() {
+        return slotFuture;
+    }
+
+    void failRequest(Exception cause) {
+        slotFuture.completeExceptionally(cause);
+    }
+
+    public boolean isBatchRequest() {
+        return isBatchRequest;
+    }
+
+    public void markFulfillable() {

Review comment:
       should we have all "public" methods either public or package private (same as the class)?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -102,12 +103,16 @@ public SlotProfile getSlotProfile(
                         preferredLocationsRetriever.getPreferredLocations(
                                 execution, producersToIgnore));
             }
-            return SlotProfile.priorAllocation(
-                    physicalSlotResourceProfile,
-                    physicalSlotResourceProfile,
-                    preferredLocations,
-                    priorAllocations,
-                    reservedAllocationIds);
+
+            final SlotProfile slotProfile =

Review comment:
       Why do we need this change?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@link RequestSlotMatchingStrategy} that takes the preferred allocations into account. The
+ * strategy will try to fulfill the preferred allocations and if this is not possible, then it will
+ * fall back to {@link SimpleRequestSlotMatchingStrategy}.
+ */
+public enum PreferredAllocationRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatching> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatching> requestSlotMatchings = new ArrayList<>();
+
+        final Map<AllocationID, PhysicalSlot> freeSlots =
+                slots.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        PhysicalSlot::getAllocationId, Function.identity()));
+
+        final Map<SlotRequestId, PendingRequest> pendingRequestsWithPreferredAllocations =
+                new HashMap<>();
+        final List<PendingRequest> unmatchedRequests = new ArrayList<>();
+
+        // Split requests into those that have preferred allocations and those that don't have
+        for (PendingRequest pendingRequest : pendingRequests) {
+            if (pendingRequest.getPreferredAllocations().isEmpty()) {
+                unmatchedRequests.add(pendingRequest);
+            } else {
+                pendingRequestsWithPreferredAllocations.put(
+                        pendingRequest.getSlotRequestId(), pendingRequest);
+            }
+        }
+
+        Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator();
+        // Match slots and pending requests based on preferred allocation
+        while (freeSlotsIterator.hasNext() && !pendingRequestsWithPreferredAllocations.isEmpty()) {
+            final PhysicalSlot freeSlot = freeSlotsIterator.next();
+
+            final Iterator<PendingRequest> pendingRequestIterator =
+                    pendingRequestsWithPreferredAllocations.values().iterator();
+
+            while (pendingRequestIterator.hasNext()) {
+                final PendingRequest pendingRequest = pendingRequestIterator.next();
+
+                if (freeSlot.getResourceProfile().isMatching(pendingRequest.getResourceProfile())
+                        && pendingRequest
+                                .getPreferredAllocations()
+                                .contains(freeSlot.getAllocationId())) {
+                    requestSlotMatchings.add(
+                            RequestSlotMatching.createFor(pendingRequest, freeSlot));
+                    pendingRequestIterator.remove();
+                    freeSlotsIterator.remove();
+                    break;
+                }
+            }
+        }
+
+        unmatchedRequests.addAll(pendingRequestsWithPreferredAllocations.values());
+        if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
+            requestSlotMatchings.addAll(
+                    SimpleRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots(

Review comment:
       👍 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@link RequestSlotMatchingStrategy} that takes the preferred allocations into account. The
+ * strategy will try to fulfill the preferred allocations and if this is not possible, then it will
+ * fall back to {@link SimpleRequestSlotMatchingStrategy}.
+ */
+public enum PreferredAllocationRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatching> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatching> requestSlotMatchings = new ArrayList<>();
+
+        final Map<AllocationID, PhysicalSlot> freeSlots =
+                slots.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        PhysicalSlot::getAllocationId, Function.identity()));
+
+        final Map<SlotRequestId, PendingRequest> pendingRequestsWithPreferredAllocations =
+                new HashMap<>();
+        final List<PendingRequest> unmatchedRequests = new ArrayList<>();
+
+        // Split requests into those that have preferred allocations and those that don't have
+        for (PendingRequest pendingRequest : pendingRequests) {
+            if (pendingRequest.getPreferredAllocations().isEmpty()) {
+                unmatchedRequests.add(pendingRequest);
+            } else {
+                pendingRequestsWithPreferredAllocations.put(
+                        pendingRequest.getSlotRequestId(), pendingRequest);
+            }
+        }
+
+        Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator();

Review comment:
       nit
   ```suggestion
           final Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator();
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
##########
@@ -184,6 +189,25 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
                 slotPoolServiceFactory, schedulerNGFactory);
     }
 
+    private static RequestSlotMatchingStrategy getRequestSlotMatchingStrategy(

Review comment:
       Should we have a test for this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r783151361



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -177,9 +199,18 @@ void start(
      *     requested batch slot
      * @return a future which is completed with newly allocated batch slot
      */
+    @Nonnull
+    default CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
+            @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {

Review comment:
       Yes, I will make it consistent in this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r782904887



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+
+/**
+ * Simple implementation of the {@link RequestSlotMatchingStrategy} that matches the pending
+ * requests in order as long as the resource profile can be fulfilled.
+ */
+public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatching> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatching> resultingMatchings = new ArrayList<>();
+
+        // if pendingRequests has a special order, then let's preserve it
+        final LinkedHashMap<SlotRequestId, PendingRequest> pendingRequestsIndex =

Review comment:
       We don't. I think you are right and we should use a `LinkedList` here. Will change it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #18286:
URL: https://github.com/apache/flink/pull/18286#discussion_r783155440



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+final class PendingRequest {
+
+    private final SlotRequestId slotRequestId;
+
+    private final ResourceProfile resourceProfile;
+
+    private final HashSet<AllocationID> preferredAllocations;
+
+    private final CompletableFuture<PhysicalSlot> slotFuture;
+
+    private final boolean isBatchRequest;
+
+    private long unfulfillableSince;
+
+    private PendingRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations,
+            boolean isBatchRequest) {
+        this.slotRequestId = slotRequestId;
+        this.resourceProfile = resourceProfile;
+        this.preferredAllocations = new HashSet<>(preferredAllocations);
+        this.isBatchRequest = isBatchRequest;
+        this.slotFuture = new CompletableFuture<>();
+        this.unfulfillableSince = Long.MAX_VALUE;
+    }
+
+    static PendingRequest createBatchRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, true);
+    }
+
+    static PendingRequest createNormalRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, false);
+    }
+
+    SlotRequestId getSlotRequestId() {
+        return slotRequestId;
+    }
+
+    ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    public Set<AllocationID> getPreferredAllocations() {
+        return preferredAllocations;
+    }
+
+    CompletableFuture<PhysicalSlot> getSlotFuture() {
+        return slotFuture;
+    }
+
+    void failRequest(Exception cause) {
+        slotFuture.completeExceptionally(cause);
+    }
+
+    public boolean isBatchRequest() {
+        return isBatchRequest;
+    }
+
+    public void markFulfillable() {

Review comment:
       I think this makes sense. I will change it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+final class PendingRequest {
+
+    private final SlotRequestId slotRequestId;
+
+    private final ResourceProfile resourceProfile;
+
+    private final HashSet<AllocationID> preferredAllocations;
+
+    private final CompletableFuture<PhysicalSlot> slotFuture;
+
+    private final boolean isBatchRequest;
+
+    private long unfulfillableSince;
+
+    private PendingRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations,
+            boolean isBatchRequest) {
+        this.slotRequestId = slotRequestId;
+        this.resourceProfile = resourceProfile;
+        this.preferredAllocations = new HashSet<>(preferredAllocations);
+        this.isBatchRequest = isBatchRequest;
+        this.slotFuture = new CompletableFuture<>();
+        this.unfulfillableSince = Long.MAX_VALUE;
+    }
+
+    static PendingRequest createBatchRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, true);
+    }
+
+    static PendingRequest createNormalRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, false);
+    }
+
+    SlotRequestId getSlotRequestId() {
+        return slotRequestId;
+    }
+
+    ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    public Set<AllocationID> getPreferredAllocations() {
+        return preferredAllocations;
+    }
+
+    CompletableFuture<PhysicalSlot> getSlotFuture() {
+        return slotFuture;
+    }
+
+    void failRequest(Exception cause) {
+        slotFuture.completeExceptionally(cause);
+    }
+
+    public boolean isBatchRequest() {
+        return isBatchRequest;
+    }
+
+    public void markFulfillable() {

Review comment:
       I think this makes sense. I will change it to package private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann closed pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann closed pull request #18286:
URL: https://github.com/apache/flink/pull/18286


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333",
       "triggerID" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04df41b87646cb470605c2b300f68293f49bacc7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044) 
   * d1f9367d455c13cb079f03759cf9ec8d8e280f1e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1011173194


   Thanks a lot for the review @dmvk. I've addressed your comments and pushed an update. Please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04df41b87646cb470605c2b300f68293f49bacc7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04df41b87646cb470605c2b300f68293f49bacc7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18286: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18286:
URL: https://github.com/apache/flink/pull/18286#issuecomment-1006561511


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29044",
       "triggerID" : "04df41b87646cb470605c2b300f68293f49bacc7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333",
       "triggerID" : "d1f9367d455c13cb079f03759cf9ec8d8e280f1e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d1f9367d455c13cb079f03759cf9ec8d8e280f1e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29333) 
   * 93cdb5d6803fd363ba7d431173fc8ee78a7f4f8b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org