You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/01/15 10:12:58 UTC

[flink] branch master updated (56ba36d -> fe0ce24)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 56ba36d  [FLINK-25034][runtime] Let InputGateDeploymentDescriptor supports subpartition range.
     new c42d10d  [hotfix] Port DefaultSlotPoolServiceSchedulerFactoryTest to use Junit5
     new 4a98a10  [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge
     new fe0ce24  [hotfix] Remove @Nonnull annotations from SlotPool

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../clusterframework/types/SlotProfile.java        |  16 ++
 .../flink/runtime/executiongraph/Execution.java    |   3 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |  27 ++-
 .../slotpool/DeclarativeSlotPoolBridge.java        | 192 +++++----------------
 .../DeclarativeSlotPoolBridgeServiceFactory.java   |   9 +-
 .../runtime/jobmaster/slotpool/PendingRequest.java | 132 ++++++++++++++
 .../slotpool/PhysicalSlotProviderImpl.java         |   9 +-
 ...erredAllocationRequestSlotMatchingStrategy.java | 103 +++++++++++
 .../PreviousAllocationSlotSelectionStrategy.java   |   8 +
 .../slotpool/RequestSlotMatchingStrategy.java      |  58 +++++++
 .../SimpleRequestSlotMatchingStrategy.java         |  61 +++++++
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  44 +++--
 .../scheduler/ExecutionSlotSharingGroup.java       |  10 ++
 .../MergingSharedSlotProfileRetrieverFactory.java  |   2 +
 ...DefaultSlotPoolServiceSchedulerFactoryTest.java |  62 +++++--
 .../flink/runtime/jobmaster/JobMasterTest.java     |   5 +-
 .../slotpool/DeclarativeSlotPoolBridgeBuilder.java |  12 +-
 ...tiveSlotPoolBridgePreferredAllocationsTest.java | 104 +++++++++++
 ...ativeSlotPoolBridgeResourceDeclarationTest.java |  23 ++-
 .../slotpool/DeclarativeSlotPoolBridgeTest.java    |  38 +++-
 ...dAllocationRequestSlotMatchingStrategyTest.java |  77 +++++++++
 .../SimpleRequestSlotMatchingStrategyTest.java     |  99 +++++++++++
 22 files changed, 901 insertions(+), 193 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java

[flink] 02/03: [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4a98a10c931e4ff9750d38da5c260a35a54fb56a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Nov 19 13:44:50 2021 +0100

    [FLINK-25533] Forward preferred allocations into the DeclarativeSlotPoolBridge
    
    This commit forwards the preferred allocations into the DeclarativeSlotPoolBridge so that
    new slots can be matched against the preferred allocations.
    
    Moreover, the commit introduces a RequestSlotMatchingStrategy that is used by the DeclarativeSlotPoolBridge
    to match slots to pending requests. If local recovery is enabled, then Flink will use the
    PreferredAllocationRequestSlotMatchingStrategy that first tries to match pending requests to slots based on
    their preferred allocation and then falls back to SimpleRequestSlotMatchingStrategy that matches solely on the
    ResourceProfile basis.
    
    This closes #18286.
---
 .../clusterframework/types/SlotProfile.java        |  16 ++
 .../flink/runtime/executiongraph/Execution.java    |   3 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |  27 ++-
 .../slotpool/DeclarativeSlotPoolBridge.java        | 192 +++++----------------
 .../DeclarativeSlotPoolBridgeServiceFactory.java   |   9 +-
 .../runtime/jobmaster/slotpool/PendingRequest.java | 132 ++++++++++++++
 .../slotpool/PhysicalSlotProviderImpl.java         |   9 +-
 ...erredAllocationRequestSlotMatchingStrategy.java | 103 +++++++++++
 .../PreviousAllocationSlotSelectionStrategy.java   |   8 +
 .../slotpool/RequestSlotMatchingStrategy.java      |  58 +++++++
 .../SimpleRequestSlotMatchingStrategy.java         |  61 +++++++
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  33 +++-
 .../scheduler/ExecutionSlotSharingGroup.java       |  10 ++
 .../MergingSharedSlotProfileRetrieverFactory.java  |   2 +
 ...DefaultSlotPoolServiceSchedulerFactoryTest.java |  32 ++++
 .../flink/runtime/jobmaster/JobMasterTest.java     |   5 +-
 .../slotpool/DeclarativeSlotPoolBridgeBuilder.java |  12 +-
 ...tiveSlotPoolBridgePreferredAllocationsTest.java | 104 +++++++++++
 ...ativeSlotPoolBridgeResourceDeclarationTest.java |  23 ++-
 .../slotpool/DeclarativeSlotPoolBridgeTest.java    |  38 +++-
 ...dAllocationRequestSlotMatchingStrategyTest.java |  77 +++++++++
 .../SimpleRequestSlotMatchingStrategyTest.java     |  99 +++++++++++
 22 files changed, 886 insertions(+), 167 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index e57a089..a0dcb6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -122,4 +122,20 @@ public class SlotProfile {
                 priorAllocations,
                 reservedAllocations);
     }
+
+    @Override
+    public String toString() {
+        return "SlotProfile{"
+                + "taskResourceProfile="
+                + taskResourceProfile
+                + ", physicalSlotResourceProfile="
+                + physicalSlotResourceProfile
+                + ", preferredLocations="
+                + preferredLocations
+                + ", preferredAllocations="
+                + preferredAllocations
+                + ", reservedAllocations="
+                + reservedAllocations
+                + '}';
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2719c92..4b42470 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -548,10 +548,11 @@ public class Execution
             }
 
             LOG.info(
-                    "Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",
+                    "Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}",
                     vertex.getTaskNameWithSubtaskIndex(),
                     attemptNumber,
                     vertex.getCurrentExecutionAttempt().getAttemptId(),
+                    vertex.getID(),
                     getAssignedResourceLocation(),
                     slot.getAllocationId());
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 7298fba..03300ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -35,6 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeServiceFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolServiceFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolServiceFactory;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -165,7 +169,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                                 SystemClock.getInstance(),
                                 rpcTimeout,
                                 slotIdleTimeout,
-                                batchSlotTimeout);
+                                batchSlotTimeout,
+                                getRequestSlotMatchingStrategy(configuration, jobType));
                 break;
             case Adaptive:
                 schedulerNGFactory = getAdaptiveSchedulerFactoryFromConfiguration(configuration);
@@ -184,6 +189,26 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                 slotPoolServiceFactory, schedulerNGFactory);
     }
 
+    @VisibleForTesting
+    static RequestSlotMatchingStrategy getRequestSlotMatchingStrategy(
+            Configuration configuration, JobType jobType) {
+        final boolean isLocalRecoveryEnabled =
+                configuration.get(CheckpointingOptions.LOCAL_RECOVERY);
+
+        if (isLocalRecoveryEnabled) {
+            if (jobType == JobType.STREAMING) {
+                return PreferredAllocationRequestSlotMatchingStrategy.INSTANCE;
+            } else {
+                LOG.warn(
+                        "Batch jobs do not support local recovery. Falling back for request slot matching strategy to {}.",
+                        SimpleRequestSlotMatchingStrategy.class.getSimpleName());
+                return SimpleRequestSlotMatchingStrategy.INSTANCE;
+            }
+        } else {
+            return SimpleRequestSlotMatchingStrategy.INSTANCE;
+        }
+    }
+
     private static AdaptiveSchedulerFactory getAdaptiveSchedulerFactoryFromConfiguration(
             Configuration configuration) {
         Duration allocationTimeoutDefault = JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 5827683..5258f98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -59,6 +59,8 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
     private final Map<SlotRequestId, AllocationID> fulfilledRequests;
     private final Time idleSlotTimeout;
 
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+
     @Nullable private ComponentMainThreadExecutor componentMainThreadExecutor;
 
     private final Time batchSlotTimeout;
@@ -70,11 +72,18 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
             Clock clock,
             Time rpcTimeout,
             Time idleSlotTimeout,
-            Time batchSlotTimeout) {
+            Time batchSlotTimeout,
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
         super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout);
 
         this.idleSlotTimeout = idleSlotTimeout;
         this.batchSlotTimeout = Preconditions.checkNotNull(batchSlotTimeout);
+
+        log.debug(
+                "Using the request slot matching strategy: {}",
+                requestSlotMatchingStrategy.getClass().getSimpleName());
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
+
         this.isBatchSlotRequestTimeoutCheckDisabled = false;
 
         this.pendingRequests = new LinkedHashMap<>();
@@ -142,32 +151,35 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
 
     @VisibleForTesting
     void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
-        final Collection<PendingRequestSlotMatching> matchingsToFulfill = new ArrayList<>();
-
-        for (PhysicalSlot newSlot : newSlots) {
-            final Optional<PendingRequest> matchingPendingRequest =
-                    findMatchingPendingRequest(newSlot);
-
-            matchingPendingRequest.ifPresent(
-                    pendingRequest -> {
-                        Preconditions.checkNotNull(
-                                pendingRequests.remove(pendingRequest.getSlotRequestId()),
-                                "Cannot fulfill a non existing pending slot request.");
-                        reserveFreeSlot(
-                                pendingRequest.getSlotRequestId(),
-                                newSlot.getAllocationId(),
-                                pendingRequest.resourceProfile);
-
-                        matchingsToFulfill.add(
-                                PendingRequestSlotMatching.createFor(pendingRequest, newSlot));
-                    });
+        final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches =
+                requestSlotMatchingStrategy.matchRequestsAndSlots(
+                        newSlots, pendingRequests.values());
+
+        for (RequestSlotMatchingStrategy.RequestSlotMatch match : requestSlotMatches) {
+            final PendingRequest pendingRequest = match.getPendingRequest();
+            final PhysicalSlot slot = match.getSlot();
+
+            log.debug("Matched pending request {} with slot {}.", pendingRequest, slot);
+
+            Preconditions.checkNotNull(
+                    pendingRequests.remove(pendingRequest.getSlotRequestId()),
+                    "Cannot fulfill a non existing pending slot request.");
+
+            reserveFreeSlot(
+                    pendingRequest.getSlotRequestId(),
+                    slot.getAllocationId(),
+                    pendingRequest.getResourceProfile());
         }
 
         // we have to first reserve all matching slots before fulfilling the requests
         // otherwise it can happen that the scheduler reserves one of the new slots
         // for a request which has been triggered by fulfilling a pending request
-        for (PendingRequestSlotMatching pendingRequestSlotMatching : matchingsToFulfill) {
-            pendingRequestSlotMatching.fulfillPendingRequest();
+        for (RequestSlotMatchingStrategy.RequestSlotMatch requestSlotMatch : requestSlotMatches) {
+            final PendingRequest pendingRequest = requestSlotMatch.getPendingRequest();
+            final PhysicalSlot slot = requestSlotMatch.getSlot();
+
+            Preconditions.checkState(
+                    pendingRequest.fulfill(slot), "Pending requests must be fulfillable.");
         }
     }
 
@@ -180,20 +192,6 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
         fulfilledRequests.put(slotRequestId, allocationId);
     }
 
-    private Optional<PendingRequest> findMatchingPendingRequest(PhysicalSlot slot) {
-        final ResourceProfile resourceProfile = slot.getResourceProfile();
-
-        for (PendingRequest pendingRequest : pendingRequests.values()) {
-            if (resourceProfile.isMatching(pendingRequest.getResourceProfile())) {
-                log.debug("Matched slot {} to pending request {}.", slot, pendingRequest);
-                return Optional.of(pendingRequest);
-            }
-        }
-        log.debug("Could not match slot {} to any pending request.", slot);
-
-        return Optional.empty();
-    }
-
     @Override
     public Optional<PhysicalSlot> allocateAvailableSlot(
             @Nonnull SlotRequestId slotRequestId,
@@ -231,6 +229,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
     public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
             @Nonnull SlotRequestId slotRequestId,
             @Nonnull ResourceProfile resourceProfile,
+            @Nonnull Collection<AllocationID> preferredAllocations,
             @Nullable Time timeout) {
         assertRunningInMainThread();
 
@@ -240,7 +239,8 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
                 resourceProfile);
 
         final PendingRequest pendingRequest =
-                PendingRequest.createNormalRequest(slotRequestId, resourceProfile);
+                PendingRequest.createNormalRequest(
+                        slotRequestId, resourceProfile, preferredAllocations);
 
         return internalRequestNewSlot(pendingRequest, timeout);
     }
@@ -248,7 +248,9 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
     @Override
     @Nonnull
     public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
-            @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
+            @Nonnull SlotRequestId slotRequestId,
+            @Nonnull ResourceProfile resourceProfile,
+            @Nonnull Collection<AllocationID> preferredAllocations) {
         assertRunningInMainThread();
 
         log.debug(
@@ -257,7 +259,8 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
                 resourceProfile);
 
         final PendingRequest pendingRequest =
-                PendingRequest.createBatchRequest(slotRequestId, resourceProfile);
+                PendingRequest.createBatchRequest(
+                        slotRequestId, resourceProfile, preferredAllocations);
 
         return internalRequestNewSlot(pendingRequest, null);
     }
@@ -495,115 +498,4 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
     boolean isBatchSlotRequestTimeoutCheckEnabled() {
         return !isBatchSlotRequestTimeoutCheckDisabled;
     }
-
-    private static final class PendingRequest {
-
-        private final SlotRequestId slotRequestId;
-
-        private final ResourceProfile resourceProfile;
-
-        private final CompletableFuture<PhysicalSlot> slotFuture;
-
-        private final boolean isBatchRequest;
-
-        private long unfulfillableSince;
-
-        private PendingRequest(
-                SlotRequestId slotRequestId,
-                ResourceProfile resourceProfile,
-                boolean isBatchRequest) {
-            this.slotRequestId = slotRequestId;
-            this.resourceProfile = resourceProfile;
-            this.isBatchRequest = isBatchRequest;
-            this.slotFuture = new CompletableFuture<>();
-            this.unfulfillableSince = Long.MAX_VALUE;
-        }
-
-        static PendingRequest createBatchRequest(
-                SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
-            return new PendingRequest(slotRequestId, resourceProfile, true);
-        }
-
-        static PendingRequest createNormalRequest(
-                SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
-            return new PendingRequest(slotRequestId, resourceProfile, false);
-        }
-
-        SlotRequestId getSlotRequestId() {
-            return slotRequestId;
-        }
-
-        ResourceProfile getResourceProfile() {
-            return resourceProfile;
-        }
-
-        CompletableFuture<PhysicalSlot> getSlotFuture() {
-            return slotFuture;
-        }
-
-        void failRequest(Exception cause) {
-            slotFuture.completeExceptionally(cause);
-        }
-
-        public boolean isBatchRequest() {
-            return isBatchRequest;
-        }
-
-        public void markFulfillable() {
-            this.unfulfillableSince = Long.MAX_VALUE;
-        }
-
-        public void markUnfulfillable(long currentTimestamp) {
-            if (isFulfillable()) {
-                this.unfulfillableSince = currentTimestamp;
-            }
-        }
-
-        private boolean isFulfillable() {
-            return this.unfulfillableSince == Long.MAX_VALUE;
-        }
-
-        public long getUnfulfillableSince() {
-            return unfulfillableSince;
-        }
-
-        public boolean fulfill(PhysicalSlot slot) {
-            return slotFuture.complete(slot);
-        }
-
-        @Override
-        public String toString() {
-            return "PendingRequest{"
-                    + "slotRequestId="
-                    + slotRequestId
-                    + ", resourceProfile="
-                    + resourceProfile
-                    + ", isBatchRequest="
-                    + isBatchRequest
-                    + ", unfulfillableSince="
-                    + unfulfillableSince
-                    + '}';
-        }
-    }
-
-    private static final class PendingRequestSlotMatching {
-        private final PendingRequest pendingRequest;
-        private final PhysicalSlot matchedSlot;
-
-        private PendingRequestSlotMatching(
-                PendingRequest pendingRequest, PhysicalSlot matchedSlot) {
-            this.pendingRequest = pendingRequest;
-            this.matchedSlot = matchedSlot;
-        }
-
-        public static PendingRequestSlotMatching createFor(
-                PendingRequest pendingRequest, PhysicalSlot newSlot) {
-            return new PendingRequestSlotMatching(pendingRequest, newSlot);
-        }
-
-        public void fulfillPendingRequest() {
-            Preconditions.checkState(
-                    pendingRequest.fulfill(matchedSlot), "Pending requests must be fulfillable.");
-        }
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
index 6169a22..a298a64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
@@ -27,12 +27,16 @@ import javax.annotation.Nonnull;
 /** Factory for {@link DeclarativeSlotPoolBridge}. */
 public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolServiceFactory {
 
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+
     public DeclarativeSlotPoolBridgeServiceFactory(
             @Nonnull Clock clock,
             @Nonnull Time rpcTimeout,
             @Nonnull Time slotIdleTimeout,
-            @Nonnull Time batchSlotTimeout) {
+            @Nonnull Time batchSlotTimeout,
+            @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
         super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout);
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
     }
 
     @Nonnull
@@ -44,6 +48,7 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer
                 clock,
                 rpcTimeout,
                 slotIdleTimeout,
-                batchSlotTimeout);
+                batchSlotTimeout,
+                requestSlotMatchingStrategy);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
new file mode 100644
index 0000000..376df27
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+final class PendingRequest {
+
+    private final SlotRequestId slotRequestId;
+
+    private final ResourceProfile resourceProfile;
+
+    private final HashSet<AllocationID> preferredAllocations;
+
+    private final CompletableFuture<PhysicalSlot> slotFuture;
+
+    private final boolean isBatchRequest;
+
+    private long unfulfillableSince;
+
+    private PendingRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations,
+            boolean isBatchRequest) {
+        this.slotRequestId = slotRequestId;
+        this.resourceProfile = resourceProfile;
+        this.preferredAllocations = new HashSet<>(preferredAllocations);
+        this.isBatchRequest = isBatchRequest;
+        this.slotFuture = new CompletableFuture<>();
+        this.unfulfillableSince = Long.MAX_VALUE;
+    }
+
+    static PendingRequest createBatchRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, true);
+    }
+
+    static PendingRequest createNormalRequest(
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations) {
+        return new PendingRequest(slotRequestId, resourceProfile, preferredAllocations, false);
+    }
+
+    SlotRequestId getSlotRequestId() {
+        return slotRequestId;
+    }
+
+    ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    Set<AllocationID> getPreferredAllocations() {
+        return preferredAllocations;
+    }
+
+    CompletableFuture<PhysicalSlot> getSlotFuture() {
+        return slotFuture;
+    }
+
+    void failRequest(Exception cause) {
+        slotFuture.completeExceptionally(cause);
+    }
+
+    boolean isBatchRequest() {
+        return isBatchRequest;
+    }
+
+    void markFulfillable() {
+        this.unfulfillableSince = Long.MAX_VALUE;
+    }
+
+    void markUnfulfillable(long currentTimestamp) {
+        if (isFulfillable()) {
+            this.unfulfillableSince = currentTimestamp;
+        }
+    }
+
+    private boolean isFulfillable() {
+        return this.unfulfillableSince == Long.MAX_VALUE;
+    }
+
+    long getUnfulfillableSince() {
+        return unfulfillableSince;
+    }
+
+    boolean fulfill(PhysicalSlot slot) {
+        return slotFuture.complete(slot);
+    }
+
+    @Override
+    public String toString() {
+        return "PendingRequest{"
+                + "slotRequestId="
+                + slotRequestId
+                + ", resourceProfile="
+                + resourceProfile
+                + ", preferredAllocations="
+                + preferredAllocations
+                + ", isBatchRequest="
+                + isBatchRequest
+                + ", unfulfillableSince="
+                + unfulfillableSince
+                + '}';
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 051cf0e..433cf89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -71,6 +72,7 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
                                         requestNewSlot(
                                                 slotRequestId,
                                                 resourceProfile,
+                                                slotProfile.getPreferredAllocations(),
                                                 physicalSlotRequest
                                                         .willSlotBeOccupiedIndefinitely()));
 
@@ -99,11 +101,14 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
     private CompletableFuture<PhysicalSlot> requestNewSlot(
             SlotRequestId slotRequestId,
             ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations,
             boolean willSlotBeOccupiedIndefinitely) {
         if (willSlotBeOccupiedIndefinitely) {
-            return slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null);
+            return slotPool.requestNewAllocatedSlot(
+                    slotRequestId, resourceProfile, preferredAllocations, null);
         } else {
-            return slotPool.requestNewAllocatedBatchSlot(slotRequestId, resourceProfile);
+            return slotPool.requestNewAllocatedBatchSlot(
+                    slotRequestId, resourceProfile, preferredAllocations);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
new file mode 100644
index 0000000..dc65184
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@link RequestSlotMatchingStrategy} that takes the preferred allocations into account. The
+ * strategy will try to fulfill the preferred allocations and if this is not possible, then it will
+ * fall back to {@link SimpleRequestSlotMatchingStrategy}.
+ */
+public enum PreferredAllocationRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatch> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatch> requestSlotMatches = new ArrayList<>();
+
+        final Map<AllocationID, PhysicalSlot> freeSlots =
+                slots.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        PhysicalSlot::getAllocationId, Function.identity()));
+
+        final Map<SlotRequestId, PendingRequest> pendingRequestsWithPreferredAllocations =
+                new HashMap<>();
+        final List<PendingRequest> unmatchedRequests = new ArrayList<>();
+
+        // Split requests into those that have preferred allocations and those that don't have
+        for (PendingRequest pendingRequest : pendingRequests) {
+            if (pendingRequest.getPreferredAllocations().isEmpty()) {
+                unmatchedRequests.add(pendingRequest);
+            } else {
+                pendingRequestsWithPreferredAllocations.put(
+                        pendingRequest.getSlotRequestId(), pendingRequest);
+            }
+        }
+
+        final Iterator<PhysicalSlot> freeSlotsIterator = freeSlots.values().iterator();
+        // Match slots and pending requests based on preferred allocation
+        while (freeSlotsIterator.hasNext() && !pendingRequestsWithPreferredAllocations.isEmpty()) {
+            final PhysicalSlot freeSlot = freeSlotsIterator.next();
+
+            final Iterator<PendingRequest> pendingRequestIterator =
+                    pendingRequestsWithPreferredAllocations.values().iterator();
+
+            while (pendingRequestIterator.hasNext()) {
+                final PendingRequest pendingRequest = pendingRequestIterator.next();
+
+                if (freeSlot.getResourceProfile().isMatching(pendingRequest.getResourceProfile())
+                        && pendingRequest
+                                .getPreferredAllocations()
+                                .contains(freeSlot.getAllocationId())) {
+                    requestSlotMatches.add(RequestSlotMatch.createFor(pendingRequest, freeSlot));
+                    pendingRequestIterator.remove();
+                    freeSlotsIterator.remove();
+                    break;
+                }
+            }
+        }
+
+        unmatchedRequests.addAll(pendingRequestsWithPreferredAllocations.values());
+        if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
+            requestSlotMatches.addAll(
+                    SimpleRequestSlotMatchingStrategy.INSTANCE.matchRequestsAndSlots(
+                            freeSlots.values(), unmatchedRequests));
+        }
+
+        return requestSlotMatches;
+    }
+
+    @Override
+    public String toString() {
+        return PreferredAllocationRequestSlotMatchingStrategy.class.getSimpleName();
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
index 6a8a832..d9f507e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
@@ -22,6 +22,9 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
@@ -36,6 +39,9 @@ import java.util.Set;
  */
 public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy {
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PreviousAllocationSlotSelectionStrategy.class);
+
     private final SlotSelectionStrategy fallbackSlotSelectionStrategy;
 
     private PreviousAllocationSlotSelectionStrategy(
@@ -48,6 +54,8 @@ public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStr
             @Nonnull Collection<SlotInfoAndResources> availableSlots,
             @Nonnull SlotProfile slotProfile) {
 
+        LOG.debug("Select best slot for profile {}.", slotProfile);
+
         Collection<AllocationID> priorAllocations = slotProfile.getPreferredAllocations();
 
         // First, if there was a prior allocation try to schedule to the same/old slot
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
new file mode 100644
index 0000000..05d7892
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import java.util.Collection;
+
+/** Strategy to match slot requests to slots. */
+public interface RequestSlotMatchingStrategy {
+
+    /**
+     * Match the given slots with the given collection of pending requests.
+     *
+     * @param slots slots to match
+     * @param pendingRequests slot requests to match
+     * @return resulting matches of this operation
+     */
+    Collection<RequestSlotMatch> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests);
+
+    /** Result class representing matches. */
+    final class RequestSlotMatch {
+        private final PendingRequest pendingRequest;
+        private final PhysicalSlot matchedSlot;
+
+        private RequestSlotMatch(PendingRequest pendingRequest, PhysicalSlot matchedSlot) {
+            this.pendingRequest = pendingRequest;
+            this.matchedSlot = matchedSlot;
+        }
+
+        PhysicalSlot getSlot() {
+            return matchedSlot;
+        }
+
+        PendingRequest getPendingRequest() {
+            return pendingRequest;
+        }
+
+        static RequestSlotMatch createFor(PendingRequest pendingRequest, PhysicalSlot newSlot) {
+            return new RequestSlotMatch(pendingRequest, newSlot);
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
new file mode 100644
index 0000000..d4e987f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * Simple implementation of the {@link RequestSlotMatchingStrategy} that matches the pending
+ * requests in order as long as the resource profile can be fulfilled.
+ */
+public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Collection<RequestSlotMatch> matchRequestsAndSlots(
+            Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests) {
+        final Collection<RequestSlotMatch> resultingMatches = new ArrayList<>();
+
+        // if pendingRequests has a special order, then let's preserve it
+        final LinkedList<PendingRequest> pendingRequestsIndex = new LinkedList<>(pendingRequests);
+
+        for (PhysicalSlot slot : slots) {
+            final Iterator<PendingRequest> pendingRequestIterator = pendingRequestsIndex.iterator();
+
+            while (pendingRequestIterator.hasNext()) {
+                final PendingRequest pendingRequest = pendingRequestIterator.next();
+                if (slot.getResourceProfile().isMatching(pendingRequest.getResourceProfile())) {
+                    resultingMatches.add(RequestSlotMatch.createFor(pendingRequest, slot));
+                    pendingRequestIterator.remove();
+                    break;
+                }
+            }
+        }
+
+        return resultingMatches;
+    }
+
+    @Override
+    public String toString() {
+        return SimpleRequestSlotMatchingStrategy.class.getSimpleName();
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 11eb46e..53e914d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -36,6 +36,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -162,9 +163,30 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      * @return a newly allocated slot that was previously not available.
      */
     @Nonnull
+    default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+            @Nonnull SlotRequestId slotRequestId,
+            @Nonnull ResourceProfile resourceProfile,
+            @Nullable Time timeout) {
+        return requestNewAllocatedSlot(
+                slotRequestId, resourceProfile, Collections.emptyList(), timeout);
+    }
+
+    /**
+     * Request the allocation of a new slot from the resource manager. This method will not return a
+     * slot from the already available slots from the pool, but instead will add a new slot to that
+     * pool that is immediately allocated and returned.
+     *
+     * @param slotRequestId identifying the requested slot
+     * @param resourceProfile resource profile that specifies the resource requirements for the
+     *     requested slot
+     * @param preferredAllocations preferred allocations for the new allocated slot
+     * @param timeout timeout for the allocation procedure
+     * @return a newly allocated slot that was previously not available.
+     */
     CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
             @Nonnull SlotRequestId slotRequestId,
             @Nonnull ResourceProfile resourceProfile,
+            @Nonnull Collection<AllocationID> preferredAllocations,
             @Nullable Time timeout);
 
     /**
@@ -178,8 +200,17 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      * @return a future which is completed with newly allocated batch slot
      */
     @Nonnull
+    default CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
+            @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
+        return requestNewAllocatedBatchSlot(
+                slotRequestId, resourceProfile, Collections.emptyList());
+    }
+
+    @Nonnull
     CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
-            @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile);
+            @Nonnull SlotRequestId slotRequestId,
+            @Nonnull ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations);
 
     /**
      * Disables batch slot request timeout check. Invoked when someone else wants to take over the
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
index 7e4fa08..cf513eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -52,4 +52,14 @@ class ExecutionSlotSharingGroup {
     Set<ExecutionVertexID> getExecutionVertexIds() {
         return Collections.unmodifiableSet(executionVertexIds);
     }
+
+    @Override
+    public String toString() {
+        return "ExecutionSlotSharingGroup{"
+                + "executionVertexIds="
+                + executionVertexIds
+                + ", resourceProfile="
+                + resourceProfile
+                + '}';
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
index 1fbde93..f43d685 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
 /** Factory for {@link MergingSharedSlotProfileRetriever}. */
 class MergingSharedSlotProfileRetrieverFactory
         implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
+
     private final SyncPreferredLocationsRetriever preferredLocationsRetriever;
 
     private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
@@ -102,6 +103,7 @@ class MergingSharedSlotProfileRetrieverFactory
                         preferredLocationsRetriever.getPreferredLocations(
                                 execution, producersToIgnore));
             }
+
             return SlotProfile.priorAllocation(
                     physicalSlotResourceProfile,
                     physicalSlotResourceProfile,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
index e0df73a..f1d102a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
@@ -18,16 +18,25 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -64,4 +73,27 @@ public class DefaultSlotPoolServiceSchedulerFactoryTest {
         assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerType())
                 .isEqualTo(JobManagerOptions.SchedulerType.Adaptive);
     }
+
+    @ParameterizedTest
+    @MethodSource("testGetRequestSlotMatchingStrategy")
+    public void testGetRequestSlotMatchingStrategy(
+            boolean isLocalRecoveryEnabled, JobType jobType, RequestSlotMatchingStrategy expected) {
+        final Configuration configuration = new Configuration();
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, isLocalRecoveryEnabled);
+        assertThat(
+                        DefaultSlotPoolServiceSchedulerFactory.getRequestSlotMatchingStrategy(
+                                configuration, jobType))
+                .isSameAs(expected);
+    }
+
+    private static Stream<Arguments> testGetRequestSlotMatchingStrategy() {
+        return Stream.of(
+                Arguments.of(false, JobType.BATCH, SimpleRequestSlotMatchingStrategy.INSTANCE),
+                Arguments.of(false, JobType.STREAMING, SimpleRequestSlotMatchingStrategy.INSTANCE),
+                Arguments.of(true, JobType.BATCH, SimpleRequestSlotMatchingStrategy.INSTANCE),
+                Arguments.of(
+                        true,
+                        JobType.STREAMING,
+                        PreferredAllocationRequestSlotMatchingStrategy.INSTANCE));
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 380e2bd..9c335b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -596,6 +596,7 @@ public class JobMasterTest extends TestLogger {
         public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
                 @Nonnull SlotRequestId slotRequestId,
                 @Nonnull ResourceProfile resourceProfile,
+                @Nonnull Collection<AllocationID> preferredAllocations,
                 @Nullable Time timeout) {
             return new CompletableFuture<>();
         }
@@ -603,7 +604,9 @@ public class JobMasterTest extends TestLogger {
         @Nonnull
         @Override
         public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
-                @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
+                @Nonnull SlotRequestId slotRequestId,
+                @Nonnull ResourceProfile resourceProfile,
+                @Nonnull Collection<AllocationID> preferredAllocations) {
             return new CompletableFuture<>();
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
index a5d926c..a16364d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
@@ -45,6 +45,9 @@ public class DeclarativeSlotPoolBridgeBuilder {
     @Nullable
     private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
 
+    private RequestSlotMatchingStrategy requestSlotMatchingStrategy =
+            SimpleRequestSlotMatchingStrategy.INSTANCE;
+
     public DeclarativeSlotPoolBridgeBuilder setResourceManagerGateway(
             @Nullable ResourceManagerGateway resourceManagerGateway) {
         this.resourceManagerGateway = resourceManagerGateway;
@@ -71,6 +74,12 @@ public class DeclarativeSlotPoolBridgeBuilder {
         return this;
     }
 
+    public DeclarativeSlotPoolBridgeBuilder setRequestSlotMatchingStrategy(
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
+        return this;
+    }
+
     public DeclarativeSlotPoolBridge build() {
         return new DeclarativeSlotPoolBridge(
                 jobId,
@@ -78,7 +87,8 @@ public class DeclarativeSlotPoolBridgeBuilder {
                 clock,
                 TestingUtils.infiniteTime(),
                 idleSlotTimeout,
-                batchSlotTimeout);
+                batchSlotTimeout,
+                requestSlotMatchingStrategy);
     }
 
     public DeclarativeSlotPoolBridge buildAndStart(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
new file mode 100644
index 0000000..e695852
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(TestLoggerExtension.class)
+public class DeclarativeSlotPoolBridgePreferredAllocationsTest {
+
+    @Test
+    public void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Exception {
+        final DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
+                new DeclarativeSlotPoolBridge(
+                        new JobID(),
+                        new DefaultDeclarativeSlotPoolFactory(),
+                        SystemClock.getInstance(),
+                        TestingUtils.infiniteTime(),
+                        TestingUtils.infiniteTime(),
+                        TestingUtils.infiniteTime(),
+                        PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+
+        declarativeSlotPoolBridge.start(
+                JobMasterId.generate(),
+                "localhost",
+                ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
+
+        final AllocationID allocationId1 = new AllocationID();
+        final AllocationID allocationId2 = new AllocationID();
+
+        final CompletableFuture<PhysicalSlot> slotRequestWithPreferredAllocation1 =
+                requestSlot(declarativeSlotPoolBridge, Collections.singleton(allocationId1));
+        final CompletableFuture<PhysicalSlot> slotRequestWithEmptyPreferredAllocations =
+                requestSlot(declarativeSlotPoolBridge, Collections.emptySet());
+        final CompletableFuture<PhysicalSlot> slotRequestWithPreferredAllocation2 =
+                requestSlot(declarativeSlotPoolBridge, Collections.singleton(allocationId2));
+
+        final Collection<SlotOffer> slotOffers = new ArrayList<>();
+        slotOffers.add(new SlotOffer(allocationId2, 0, ResourceProfile.ANY));
+        final AllocationID otherAllocationId = new AllocationID();
+        slotOffers.add(new SlotOffer(otherAllocationId, 1, ResourceProfile.ANY));
+        slotOffers.add(new SlotOffer(allocationId1, 2, ResourceProfile.ANY));
+
+        declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
+        declarativeSlotPoolBridge.offerSlots(
+                localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), slotOffers);
+
+        assertThat(slotRequestWithPreferredAllocation1.join().getAllocationId())
+                .isEqualTo(allocationId1);
+        assertThat(slotRequestWithPreferredAllocation2.join().getAllocationId())
+                .isEqualTo(allocationId2);
+        assertThat(slotRequestWithEmptyPreferredAllocations.join().getAllocationId())
+                .isEqualTo(otherAllocationId);
+    }
+
+    @Nonnull
+    private CompletableFuture<PhysicalSlot> requestSlot(
+            DeclarativeSlotPoolBridge declarativeSlotPoolBridge,
+            Set<AllocationID> preferredAllocations) {
+        return declarativeSlotPoolBridge.requestNewAllocatedSlot(
+                new SlotRequestId(), ResourceProfile.UNKNOWN, preferredAllocations, null);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
index f9b7ca8..e5a5829 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
@@ -32,8 +32,13 @@ import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -45,15 +50,29 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
+@RunWith(Parameterized.class)
 public class DeclarativeSlotPoolBridgeResourceDeclarationTest extends TestLogger {
 
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
             ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
 
     private RequirementListener requirementListener;
     private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;
 
+    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
+    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
+        return Arrays.asList(
+                SimpleRequestSlotMatchingStrategy.INSTANCE,
+                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+    }
+
+    public DeclarativeSlotPoolBridgeResourceDeclarationTest(
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
+    }
+
     @Before
     public void setup() throws Exception {
         requirementListener = new RequirementListener();
@@ -76,7 +95,9 @@ public class DeclarativeSlotPoolBridgeResourceDeclarationTest extends TestLogger
 
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new TestingDeclarativeSlotPoolFactory(slotPoolBuilder);
-        declarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);
+        declarativeSlotPoolBridge =
+                createDeclarativeSlotPoolBridge(
+                        declarativeSlotPoolFactory, requestSlotMatchingStrategy);
     }
 
     @After
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index 13845cf..ed1a7cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -39,11 +39,15 @@ import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +59,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
 /** Tests for the {@link DeclarativeSlotPoolBridge}. */
+@RunWith(Parameterized.class)
 public class DeclarativeSlotPoolBridgeTest extends TestLogger {
 
     private static final Time rpcTimeout = Time.seconds(20);
@@ -62,6 +67,18 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
             ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
+
+    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
+    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
+        return Arrays.asList(
+                SimpleRequestSlotMatchingStrategy.INSTANCE,
+                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
+    }
+
+    public DeclarativeSlotPoolBridgeTest(RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
+    }
 
     @Test
     public void testSlotOffer() throws Exception {
@@ -72,7 +89,8 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+                createDeclarativeSlotPoolBridge(
+                        declarativeSlotPoolFactory, requestSlotMatchingStrategy)) {
 
             declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
 
@@ -93,7 +111,8 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+                createDeclarativeSlotPoolBridge(
+                        declarativeSlotPoolFactory, requestSlotMatchingStrategy)) {
 
             declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
 
@@ -141,7 +160,8 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
         final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
                 new TestingDeclarativeSlotPoolFactory(builder);
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory)) {
+                createDeclarativeSlotPoolBridge(
+                        declarativeSlotPoolFactory, requestSlotMatchingStrategy)) {
             declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
 
             final SlotRequestId slotRequestId = new SlotRequestId();
@@ -157,7 +177,8 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
     @Test
     public void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception {
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory())) {
+                createDeclarativeSlotPoolBridge(
+                        new DefaultDeclarativeSlotPoolFactory(), requestSlotMatchingStrategy)) {
 
             declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
 
@@ -198,7 +219,8 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
     @Test
     public void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception {
         try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
-                createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory())) {
+                createDeclarativeSlotPoolBridge(
+                        new DefaultDeclarativeSlotPoolFactory(), requestSlotMatchingStrategy)) {
 
             declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
 
@@ -222,14 +244,16 @@ public class DeclarativeSlotPoolBridgeTest extends TestLogger {
 
     @Nonnull
     static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
-            DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
         return new DeclarativeSlotPoolBridge(
                 jobId,
                 declarativeSlotPoolFactory,
                 SystemClock.getInstance(),
                 rpcTimeout,
                 Time.seconds(20),
-                Time.seconds(20));
+                Time.seconds(20),
+                requestSlotMatchingStrategy);
     }
 
     static PhysicalSlot createAllocatedSlot(AllocationID allocationID) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
new file mode 100644
index 0000000..711a751
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link PreferredAllocationRequestSlotMatchingStrategy}. */
+@ExtendWith(TestLoggerExtension.class)
+public class PreferredAllocationRequestSlotMatchingStrategyTest {
+
+    /**
+     * This test ensures that new slots are matched against the preferred allocationIds of the
+     * pending requests.
+     */
+    @Test
+    public void testNewSlotsAreMatchedAgainstPreferredAllocationIDs() throws Exception {
+        final PreferredAllocationRequestSlotMatchingStrategy strategy =
+                PreferredAllocationRequestSlotMatchingStrategy.INSTANCE;
+
+        final AllocationID allocationId1 = new AllocationID();
+        final AllocationID allocationId2 = new AllocationID();
+
+        final Collection<TestingPhysicalSlot> slots =
+                Arrays.asList(
+                        TestingPhysicalSlot.builder().withAllocationID(allocationId1).build(),
+                        TestingPhysicalSlot.builder().withAllocationID(allocationId2).build());
+        final Collection<PendingRequest> pendingRequests =
+                Arrays.asList(
+                        PendingRequest.createNormalRequest(
+                                new SlotRequestId(),
+                                ResourceProfile.UNKNOWN,
+                                Collections.singleton(allocationId2)),
+                        PendingRequest.createNormalRequest(
+                                new SlotRequestId(),
+                                ResourceProfile.UNKNOWN,
+                                Collections.singleton(allocationId1)));
+
+        final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches =
+                strategy.matchRequestsAndSlots(slots, pendingRequests);
+
+        assertThat(requestSlotMatches).hasSize(2);
+
+        for (RequestSlotMatchingStrategy.RequestSlotMatch requestSlotMatch : requestSlotMatches) {
+            assertThat(requestSlotMatch.getPendingRequest().getPreferredAllocations())
+                    .contains(requestSlotMatch.getSlot().getAllocationId());
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
new file mode 100644
index 0000000..c1dc935
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SimpleRequestSlotMatchingStrategy}. */
+@ExtendWith(TestLoggerExtension.class)
+public class SimpleRequestSlotMatchingStrategyTest {
+
+    @Test
+    public void testSlotRequestsAreMatchedInOrder() {
+        final SimpleRequestSlotMatchingStrategy simpleRequestSlotMatchingStrategy =
+                SimpleRequestSlotMatchingStrategy.INSTANCE;
+
+        final Collection<PhysicalSlot> slots = Arrays.asList(TestingPhysicalSlot.builder().build());
+        final PendingRequest pendingRequest1 =
+                PendingRequest.createNormalRequest(
+                        new SlotRequestId(), ResourceProfile.UNKNOWN, Collections.emptyList());
+        final PendingRequest pendingRequest2 =
+                PendingRequest.createNormalRequest(
+                        new SlotRequestId(), ResourceProfile.UNKNOWN, Collections.emptyList());
+        final Collection<PendingRequest> pendingRequests =
+                Arrays.asList(pendingRequest1, pendingRequest2);
+
+        final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches =
+                simpleRequestSlotMatchingStrategy.matchRequestsAndSlots(slots, pendingRequests);
+
+        assertThat(requestSlotMatches).hasSize(1);
+        assertThat(
+                        Iterators.getOnlyElement(requestSlotMatches.iterator())
+                                .getPendingRequest()
+                                .getSlotRequestId())
+                .isEqualTo(pendingRequest1.getSlotRequestId());
+    }
+
+    @Test
+    public void testSlotRequestsThatCanBeFulfilledAreMatched() {
+        final SimpleRequestSlotMatchingStrategy simpleRequestSlotMatchingStrategy =
+                SimpleRequestSlotMatchingStrategy.INSTANCE;
+
+        final ResourceProfile small = ResourceProfile.newBuilder().setCpuCores(1.0).build();
+        final ResourceProfile large = ResourceProfile.newBuilder().setCpuCores(2.0).build();
+
+        final Collection<PhysicalSlot> slots =
+                Arrays.asList(
+                        TestingPhysicalSlot.builder().withResourceProfile(small).build(),
+                        TestingPhysicalSlot.builder().withResourceProfile(small).build());
+
+        final PendingRequest pendingRequest1 =
+                PendingRequest.createNormalRequest(
+                        new SlotRequestId(), large, Collections.emptyList());
+        final PendingRequest pendingRequest2 =
+                PendingRequest.createNormalRequest(
+                        new SlotRequestId(), small, Collections.emptyList());
+        final Collection<PendingRequest> pendingRequests =
+                Arrays.asList(pendingRequest1, pendingRequest2);
+
+        final Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches =
+                simpleRequestSlotMatchingStrategy.matchRequestsAndSlots(slots, pendingRequests);
+
+        assertThat(requestSlotMatches).hasSize(1);
+        assertThat(
+                        Iterators.getOnlyElement(requestSlotMatches.iterator())
+                                .getPendingRequest()
+                                .getSlotRequestId())
+                .isEqualTo(pendingRequest2.getSlotRequestId());
+    }
+}

[flink] 03/03: [hotfix] Remove @Nonnull annotations from SlotPool

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe0ce241ebae545ed86b1d0438c46421918ee19e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jan 12 15:56:16 2022 +0100

    [hotfix] Remove @Nonnull annotations from SlotPool
---
 .../flink/runtime/jobmaster/slotpool/SlotPool.java | 27 ++++++++--------------
 1 file changed, 10 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 53e914d..7ea084b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
@@ -123,7 +122,6 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      * @return a list of {@link SlotInfoWithUtilization} objects about all slots that are currently
      *     available in the slot pool.
      */
-    @Nonnull
     Collection<SlotInfoWithUtilization> getAvailableSlotsInformation();
 
     /**
@@ -147,9 +145,9 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      *     allocation id exists
      */
     Optional<PhysicalSlot> allocateAvailableSlot(
-            @Nonnull SlotRequestId slotRequestId,
-            @Nonnull AllocationID allocationID,
-            @Nonnull ResourceProfile requirementProfile);
+            SlotRequestId slotRequestId,
+            AllocationID allocationID,
+            ResourceProfile requirementProfile);
 
     /**
      * Request the allocation of a new slot from the resource manager. This method will not return a
@@ -162,11 +160,8 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      * @param timeout timeout for the allocation procedure
      * @return a newly allocated slot that was previously not available.
      */
-    @Nonnull
     default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
-            @Nonnull SlotRequestId slotRequestId,
-            @Nonnull ResourceProfile resourceProfile,
-            @Nullable Time timeout) {
+            SlotRequestId slotRequestId, ResourceProfile resourceProfile, @Nullable Time timeout) {
         return requestNewAllocatedSlot(
                 slotRequestId, resourceProfile, Collections.emptyList(), timeout);
     }
@@ -184,9 +179,9 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      * @return a newly allocated slot that was previously not available.
      */
     CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
-            @Nonnull SlotRequestId slotRequestId,
-            @Nonnull ResourceProfile resourceProfile,
-            @Nonnull Collection<AllocationID> preferredAllocations,
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            Collection<AllocationID> preferredAllocations,
             @Nullable Time timeout);
 
     /**
@@ -199,17 +194,15 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
      *     requested batch slot
      * @return a future which is completed with newly allocated batch slot
      */
-    @Nonnull
     default CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
-            @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
+            SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
         return requestNewAllocatedBatchSlot(
                 slotRequestId, resourceProfile, Collections.emptyList());
     }
 
-    @Nonnull
     CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
-            @Nonnull SlotRequestId slotRequestId,
-            @Nonnull ResourceProfile resourceProfile,
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
             Collection<AllocationID> preferredAllocations);
 
     /**

[flink] 01/03: [hotfix] Port DefaultSlotPoolServiceSchedulerFactoryTest to use Junit5

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c42d10d004b5f71a1096a310807290b867c34839
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jan 12 16:17:59 2022 +0100

    [hotfix] Port DefaultSlotPoolServiceSchedulerFactoryTest to use Junit5
---
 ...DefaultSlotPoolServiceSchedulerFactoryTest.java | 32 ++++++++++------------
 1 file changed, 14 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
index 92a8b9f..e0df73a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
@@ -24,16 +24,16 @@ import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DefaultSlotPoolServiceSchedulerFactory}. */
-public class DefaultSlotPoolServiceSchedulerFactoryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class DefaultSlotPoolServiceSchedulerFactoryTest {
 
     @Test
     public void testFallsBackToDefaultSchedulerIfBatchJob() {
@@ -44,12 +44,10 @@ public class DefaultSlotPoolServiceSchedulerFactoryTest extends TestLogger {
                 DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
                         configuration, JobType.BATCH);
 
-        assertThat(
-                defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory(),
-                is(instanceOf(DefaultSchedulerFactory.class)));
-        assertThat(
-                defaultSlotPoolServiceSchedulerFactory.getSchedulerType(),
-                is(JobManagerOptions.SchedulerType.Ng));
+        assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory())
+                .isInstanceOf(DefaultSchedulerFactory.class);
+        assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerType())
+                .isEqualTo(JobManagerOptions.SchedulerType.Ng);
     }
 
     @Test
@@ -61,11 +59,9 @@ public class DefaultSlotPoolServiceSchedulerFactoryTest extends TestLogger {
                 DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
                         configuration, JobType.STREAMING);
 
-        assertThat(
-                defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory(),
-                is(instanceOf(AdaptiveSchedulerFactory.class)));
-        assertThat(
-                defaultSlotPoolServiceSchedulerFactory.getSchedulerType(),
-                is(JobManagerOptions.SchedulerType.Adaptive));
+        assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory())
+                .isInstanceOf(AdaptiveSchedulerFactory.class);
+        assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerType())
+                .isEqualTo(JobManagerOptions.SchedulerType.Adaptive);
     }
 }