You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 04:09:37 UTC

[GitHub] [flink] azagrebin commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

azagrebin commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r429801548



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotOccupationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests whether the slot occupation works correctly.
+ */
+public class SlotOccupationTest extends TestLogger {
+
+	@Test
+	public void testSingleTaskOccupyingSlotIndefinitely() {
+		final PhysicalSlot physicalSlot = createPhysicalSlot();
+		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, true);
+
+		assertTrue(physicalSlot.willBeOccupiedIndefinitely());
+	}
+
+	@Test
+	public void testSingleTaskNotOccupyingSlotIndefinitely() {
+		final PhysicalSlot physicalSlot = createPhysicalSlot();
+		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, true);
+
+		assertTrue(physicalSlot.willBeOccupiedIndefinitely());

Review comment:
       ```suggestion
   		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, false);
   
   		assertFalse(physicalSlot.willBeOccupiedIndefinitely());
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -469,6 +469,47 @@ public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwabl
 			.collect(Collectors.toList());
 	}
 
+	@Override
+	public boolean isSlotRequestBulkFulfillable(final PhysicalSlotRequestBulk slotRequestBulk) {
+		final Set<AllocationID> assignedSlots = new HashSet<>(slotRequestBulk.getFulfilledRequests().values());
+		final Set<SlotInfo> reusableSlots = getReusableSlots(assignedSlots);
+		return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(), reusableSlots);
+	}
+
+	private Set<SlotInfo> getReusableSlots(final Set<AllocationID> slotsToExclude) {
+		return Stream
+			.concat(
+				getAvailableSlotsInformation().stream(),
+				getAllocatedSlotsInformation().stream())
+			.filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely())
+			.filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId()))
+			.collect(Collectors.toSet());
+	}
+
+	private static boolean areRequestsFulfillableWithSlots(
+			final Collection<PhysicalSlotRequest> requests,
+			final Set<SlotInfo> slots) {
+
+		final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+		for (PhysicalSlotRequest request : requests) {
+			final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(request, remainingSlots);
+			if (matchedSlot.isPresent()) {
+				remainingSlots.remove(matchedSlot.get());
+			} else {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static Optional<SlotInfo> findMatchingSlotForRequest(
+			final PhysicalSlotRequest request,
+			final Collection<SlotInfo> slots) {
+
+		final ResourceProfile requiredResource = request.getSlotProfile().getPhysicalSlotResourceProfile();

Review comment:
       Do we need to keep `PhysicalSlotRequests` in `PhysicalSlotRequestBulk`?
   It seems we use only ids and `ResourceProfile` of `PhysicalSlotRequest.SlotProfile`.
   Would it be enough for now to keep only `ResourceProfiles` in `PhysicalSlotRequestBulk.pendingRequests`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
##########
@@ -96,52 +97,88 @@ public DefaultExecutionSlotAllocator(
 
 			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
 
-			CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
+			final CompletableFuture<SlotProfile> slotProfileFuture = createSlotProfile(
+				schedulingRequirements,
+				Collections.emptySet(),
+				schedulingRequirements.getPhysicalSlotResourceProfile(),
+				allPreviousAllocationIds);
+
+			final CompletableFuture<LogicalSlot> slotFuture = slotProfileFuture.thenCompose(
+				slotProfile ->
+					slotProviderStrategy.allocateSlot(
+						slotRequestId,
+						new ScheduledUnit(
+							executionVertexId,
+							slotSharingGroupId,
+							schedulingRequirements.getCoLocationConstraint()),
+						slotProfile));
+
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
 					executionVertexId,
-					schedulingRequirements.getPreferredLocations(),
-					inputsLocationsRetriever,
-					Collections.emptySet()).thenCompose(
-							(Collection<TaskManagerLocation> preferredLocations) ->
-								slotProviderStrategy.allocateSlot(
-									slotRequestId,
-									new ScheduledUnit(
-										executionVertexId,
-										slotSharingGroupId,
-										schedulingRequirements.getCoLocationConstraint()),
-									SlotProfile.priorAllocation(
-										schedulingRequirements.getTaskResourceProfile(),
-										schedulingRequirements.getPhysicalSlotResourceProfile(),
-										preferredLocations,
-										Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
-										allPreviousAllocationIds)));
-
-			SlotExecutionVertexAssignment slotExecutionVertexAssignment =
-					new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
-			// add to map first to avoid the future completed before added.
-			pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
-
-			slotFuture.whenComplete(
-					(ignored, throwable) -> {
-						pendingSlotAssignments.remove(executionVertexId);
-						if (throwable != null) {
-							slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
-						}
-					});
+					slotFuture,
+					slotRequestId,
+					slotSharingGroupId);
 
 			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
 		}
 
 		return slotExecutionVertexAssignments;
 	}
 
-	private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+	protected void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
 		schedulingRequirements.stream()
 			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
 			.forEach(id -> checkState(
 				!pendingSlotAssignments.containsKey(id),
 				"BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
 	}
 
+	protected CompletableFuture<SlotProfile> createSlotProfile(

Review comment:
       Could we introduce `new SlotProfileRetriever(inputsLocationsRetriever, ignoreInternalProducers, useTaskProfile).createSlotProfileFutures(Collection<ExecutionVertexSchedulingRequirements>)`?
   This would deduplicate a lot of code in `OneSlotPerExecutionSlotAllocator/DefaultExecutionSlotAllocator`. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
 	public boolean requiresPreviousExecutionGraphAllocations() {
 		return slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy;
 	}
+
+	@Override
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+			final Collection<PhysicalSlotRequest> physicalSlotRequests,
+			final Time timeout) {
+
+		final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
+
+		final List<CompletableFuture<PhysicalSlotRequest.Result>> resultFutures = new ArrayList<>(physicalSlotRequests.size());
+		for (PhysicalSlotRequest request : physicalSlotRequests) {
+			final CompletableFuture<PhysicalSlotRequest.Result> resultFuture =
+				allocatePhysicalSlot(request, timeout).thenApply(result -> {
+					slotRequestBulk.markRequestFulfilled(
+						result.getSlotRequestId(),
+						result.getPhysicalSlot().getAllocationId());
+
+					return result;
+				});
+			resultFutures.add(resultFuture);
+		}
+
+		slotRequestBulkTracker.track(slotRequestBulk);
+		schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+
+		return FutureUtils.combineAll(resultFutures)
+			.whenComplete((ignore, throwable) -> slotRequestBulkTracker.untrack(slotRequestBulk));
+	}
+
+	private CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+			final PhysicalSlotRequest physicalSlotRequest,
+			final Time timeout) {
+
+		final SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		final SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
+
+		final Optional<SlotAndLocality> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);
+
+		final CompletableFuture<PhysicalSlot> slotFuture;
+		if (availablePhysicalSlot.isPresent()) {
+			slotFuture = CompletableFuture.completedFuture(availablePhysicalSlot.get().getSlot());
+		} else if (physicalSlotRequest.willSlotBeOccupiedIndefinitely()) {
+			slotFuture = slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), timeout);
+		} else {
+			slotFuture = slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());

Review comment:
       These requests to `SlotPoolImpl` will also schedule per-slot timeout in streaming and additional fullfilability check for batch slot in `SlotPoolImpl`. Do we want to keep this complication, especially in future? Bulk timeout seems to be enough and basically addresses the same.
   
   Would it be easier to add e.g. `SlotPoolImpl#requestNewAllocatedSlotWithoutTimeout`? `requestNewAllocatedSlotWithoutTimeout` could be similar to `requestNewAllocatedSlot` for streaming but w/o any per-slot timeout tracking in `SlotPoolImpl`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
 	public boolean requiresPreviousExecutionGraphAllocations() {
 		return slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy;
 	}
+
+	@Override
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+			final Collection<PhysicalSlotRequest> physicalSlotRequests,
+			final Time timeout) {
+
+		final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
+
+		final List<CompletableFuture<PhysicalSlotRequest.Result>> resultFutures = new ArrayList<>(physicalSlotRequests.size());
+		for (PhysicalSlotRequest request : physicalSlotRequests) {
+			final CompletableFuture<PhysicalSlotRequest.Result> resultFuture =
+				allocatePhysicalSlot(request, timeout).thenApply(result -> {
+					slotRequestBulk.markRequestFulfilled(
+						result.getSlotRequestId(),
+						result.getPhysicalSlot().getAllocationId());
+
+					return result;
+				});
+			resultFutures.add(resultFuture);
+		}
+
+		slotRequestBulkTracker.track(slotRequestBulk);
+		schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+
+		return FutureUtils.combineAll(resultFutures)
+			.whenComplete((ignore, throwable) -> slotRequestBulkTracker.untrack(slotRequestBulk));
+	}
+
+	private CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+			final PhysicalSlotRequest physicalSlotRequest,
+			final Time timeout) {
+
+		final SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		final SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
+
+		final Optional<SlotAndLocality> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);

Review comment:
       It looks like `componentMainThreadExecutor/tryAllocateFromAvailable/cancelSlotRequest` are the only things, we reuse in `SchedulerImpl`.
   Not sure whether it makes sense to extend this class which has not been designed for bulk slot allocation.
   Do you think we can reuse `SchedulerImpl` in future?
   Could we just introduce another component for the change in this commit (basically bulk timeout), e.g. `BulkSlotProvider.allocatePhysicalSlots/cancelSlotRequest` interface or so?
   Would just duplicating `tryAllocateFromAvailable/cancelSlotRequest` in `BulkSlotProviderImpl` bring less confusion in future?
   Will we need single slot provider eventually for pipeline region scheduling at all?
   Later we could see how to introduce slot sharing and co-location concerns, maybe as other components on top.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -469,6 +469,47 @@ public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwabl
 			.collect(Collectors.toList());
 	}
 
+	@Override
+	public boolean isSlotRequestBulkFulfillable(final PhysicalSlotRequestBulk slotRequestBulk) {

Review comment:
       Do we need to mix this concern of bulk allocation to `SlotPool`?
   Could we keep the `SlotPool` as only single physical slot allocator?
   Would it not be more natural to expose `SlotPool#getAllocatedSlotsInformation` and move the bulk logic to its tracker?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SlotProviderStrategy.java
##########
@@ -56,6 +62,24 @@
 		ScheduledUnit scheduledUnit,
 		SlotProfile slotProfile);
 
+	/**
+	 * Allocates a bulk of physical slots.
+	 *
+	 * @param physicalSlotRequests requests for physical slots
+	 * @return future of the allocation which will be completed normally only when all the requests are fulfilled
+	 */
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(

Review comment:
       Do we need `SlotProviderStrategy` in `OneSlotPerExecutionSlotAllocator` at all?
   Could we use directly new `BulkSlotProvider.allocatePhysicalSlots/cancelSlotRequest` and create `OneSlotPerExecutionSlotAllocator` with `willSlotBeOccupiedIndefinitely` flag based on `ScheduleMode`?
   

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * <p>1. Slot sharing will be ignored.
+ *
+ * <p>2. Co-location constraints are not allowed.
+ *
+ * <p>3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends DefaultExecutionSlotAllocator {

Review comment:
       `OneSlotPerExecutionSlotAllocator` and `DefaultExecutionSlotAllocator` implement different approaches.
   Would it make sense to introduce a base `AbstractExecutionSlotAllocator` and inherit it by both classes?
   There will be not much in common, mostly some state management around `pendingSlotAssignments`, if we: 
   - introduce a separate `SlotProfileRetriever`
   - use `BulkSlotProvider` and `willSlotBeOccupiedIndefinitely` flag in `OneSlotPerExecutionSlotAllocator` instead of `slotProviderStrategy`
   - maybe move static methods to utils if it make sense

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * <p>1. Slot sharing will be ignored.
+ *
+ * <p>2. Co-location constraints are not allowed.
+ *
+ * <p>3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends DefaultExecutionSlotAllocator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+	private final SlotOwner slotOwner;
+
+	public OneSlotPerExecutionSlotAllocator(
+			final SlotProviderStrategy slotProviderStrategy,
+			final InputsLocationsRetriever inputsLocationsRetriever) {
+		super(slotProviderStrategy, inputsLocationsRetriever);
+
+		this.slotOwner = new OneSlotPerExecutionSlotAllocatorSlotOwner();
+	}
+
+	@Override
+	public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+		validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+		final Set<ExecutionVertexID> allExecutionVertexIds = executionVertexSchedulingRequirements.stream()
+			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+			.collect(Collectors.toSet());
+
+		final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds =
+			generateExecutionVertexSlotRequestIds(allExecutionVertexIds);
+
+		final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			createSlotExecutionVertexAssignments(executionVertexSchedulingRequirements, executionVertexSlotRequestIds);
+
+		final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures =
+			createPhysicalSlotRequestFutures(
+				executionVertexSchedulingRequirements,
+				allExecutionVertexIds,
+				executionVertexSlotRequestIds);
+
+		allocateSlotsForAssignments(
+			physicalSlotRequestFutures,
+			slotExecutionVertexAssignments,
+			executionVertexSlotRequestIds);
+
+		return slotExecutionVertexAssignments;
+	}
+
+	private static void validateNoCoLocationConstraint(
+			final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+
+		final boolean hasCoLocationConstraint = schedulingRequirements.stream()
+			.anyMatch(r -> r.getCoLocationConstraint() != null);
+		checkState(
+			!hasCoLocationConstraint,
+			"Jobs with co-location constraints are not allowed to run with pipelined region scheduling strategy.");
+	}
+
+	private Map<ExecutionVertexID, SlotRequestId> generateExecutionVertexSlotRequestIds(
+			final Set<ExecutionVertexID> allExecutionVertexIds) {
+
+		final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds = new HashMap<>();
+		for (ExecutionVertexID executionVertexId : allExecutionVertexIds) {
+			executionVertexSlotRequestIds.put(executionVertexId, new SlotRequestId());
+		}
+		return executionVertexSlotRequestIds;
+	}
+
+	private List<SlotExecutionVertexAssignment> createSlotExecutionVertexAssignments(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements,
+			final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds) {
+
+		final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			new ArrayList<>(executionVertexSchedulingRequirements.size());
+		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+			final SlotRequestId slotRequestId = executionVertexSlotRequestIds.get(executionVertexId);
+
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
+					executionVertexId,
+					new CompletableFuture<>(),
+					slotRequestId,
+					null);
+			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
+		}
+
+		return slotExecutionVertexAssignments;
+	}
+
+	private List<CompletableFuture<PhysicalSlotRequest>> createPhysicalSlotRequestFutures(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements,
+			final Set<ExecutionVertexID> allExecutionVertexIds,
+			final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds) {
+
+		final Set<AllocationID> allPreviousAllocationIds =
+			computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
+
+		final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures =
+			new ArrayList<>(executionVertexSchedulingRequirements.size());
+		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+			final SlotRequestId slotRequestId = executionVertexSlotRequestIds.get(executionVertexId);
+
+			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
+
+			// use the task resource profile as the physical slot resource requirement since slot sharing is ignored here
+			final ResourceProfile physicalSlotResourceProfile = schedulingRequirements.getTaskResourceProfile();
+
+			final CompletableFuture<SlotProfile> slotProfileFuture = createSlotProfile(
+				schedulingRequirements,
+				allExecutionVertexIds,
+				physicalSlotResourceProfile,
+				allPreviousAllocationIds);
+			final CompletableFuture<PhysicalSlotRequest> physicalSlotRequestFuture =
+				slotProfileFuture.thenApply(
+					slotProfile -> createPhysicalSlotRequest(slotRequestId, slotProfile));
+			physicalSlotRequestFutures.add(physicalSlotRequestFuture);
+		}
+
+		return physicalSlotRequestFutures;
+	}
+
+	private PhysicalSlotRequest createPhysicalSlotRequest(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile) {
+		return new PhysicalSlotRequest(slotRequestId, slotProfile, slotProviderStrategy.willSlotBeOccupiedIndefinitely());
+	}
+
+	private void allocateSlotsForAssignments(
+			final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures,
+			final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments,
+			final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds) {
+
+		final Map<ExecutionVertexID, SlotExecutionVertexAssignment> vertexToAssignmentMapping =
+			slotExecutionVertexAssignments.stream()
+				.collect(Collectors.toMap(SlotExecutionVertexAssignment::getExecutionVertexId, Function.identity()));
+
+		final Map<SlotRequestId, ExecutionVertexID> slotToVertexMapping = new HashMap<>();
+		for (Map.Entry<ExecutionVertexID, SlotRequestId> entry : executionVertexSlotRequestIds.entrySet()) {
+			slotToVertexMapping.put(entry.getValue(), entry.getKey());
+		}
+
+		FutureUtils.combineAll(physicalSlotRequestFutures)
+			.thenCompose(physicalSlotRequests -> slotProviderStrategy.allocatePhysicalSlots(physicalSlotRequests))
+			.thenAccept(physicalSlotRequestResults -> {
+				for (PhysicalSlotRequest.Result result : physicalSlotRequestResults) {
+					final SlotRequestId slotRequestId = result.getSlotRequestId();
+					final ExecutionVertexID executionVertexId = slotToVertexMapping.get(slotRequestId);
+					try {
+						final LogicalSlot logicalSlot = allocateSingleLogicalSlotFromPhysicalSlot(
+							slotRequestId,
+							result.getPhysicalSlot());
+						vertexToAssignmentMapping.get(executionVertexId).getLogicalSlotFuture().complete(logicalSlot);
+					} catch (Exception ex) {
+						throw new CompletionException(ex);
+					}
+				}
+			})
+			.exceptionally(ex -> {
+				executionVertexSlotRequestIds.values().stream()
+					.forEach(slotRequestId -> {
+						final ExecutionVertexID executionVertexId = slotToVertexMapping.get(slotRequestId);
+						vertexToAssignmentMapping.get(executionVertexId).getLogicalSlotFuture().completeExceptionally(ex);
+					});
+				return null;
+			});
+	}
+
+	private LogicalSlot allocateSingleLogicalSlotFromPhysicalSlot(
+			final SlotRequestId slotRequestId,
+			final PhysicalSlot physicalSlot) throws FlinkException {
+
+		final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
+			slotRequestId,
+			physicalSlot,
+			null,
+			Locality.UNKNOWN,
+			slotOwner);
+
+		if (physicalSlot.tryAssignPayload(singleTaskSlot)) {
+			return singleTaskSlot;
+		} else {
+			throw new FlinkException("Could not assign payload to allocated slot " + physicalSlot.getAllocationId() + '.');

Review comment:
       Should we also release the slot from `SlotPool`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
 	public boolean requiresPreviousExecutionGraphAllocations() {
 		return slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy;
 	}
+
+	@Override
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+			final Collection<PhysicalSlotRequest> physicalSlotRequests,
+			final Time timeout) {
+
+		final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
+
+		final List<CompletableFuture<PhysicalSlotRequest.Result>> resultFutures = new ArrayList<>(physicalSlotRequests.size());
+		for (PhysicalSlotRequest request : physicalSlotRequests) {
+			final CompletableFuture<PhysicalSlotRequest.Result> resultFuture =
+				allocatePhysicalSlot(request, timeout).thenApply(result -> {
+					slotRequestBulk.markRequestFulfilled(
+						result.getSlotRequestId(),
+						result.getPhysicalSlot().getAllocationId());
+
+					return result;
+				});
+			resultFutures.add(resultFuture);
+		}
+
+		slotRequestBulkTracker.track(slotRequestBulk);
+		schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+
+		return FutureUtils.combineAll(resultFutures)
+			.whenComplete((ignore, throwable) -> slotRequestBulkTracker.untrack(slotRequestBulk));
+	}
+
+	private CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+			final PhysicalSlotRequest physicalSlotRequest,
+			final Time timeout) {
+
+		final SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		final SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
+
+		final Optional<SlotAndLocality> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);
+
+		final CompletableFuture<PhysicalSlot> slotFuture;
+		if (availablePhysicalSlot.isPresent()) {
+			slotFuture = CompletableFuture.completedFuture(availablePhysicalSlot.get().getSlot());
+		} else if (physicalSlotRequest.willSlotBeOccupiedIndefinitely()) {
+			slotFuture = slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), timeout);
+		} else {
+			slotFuture = slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());

Review comment:
       Yes, this is what I also thought about.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
##########
@@ -96,52 +97,88 @@ public DefaultExecutionSlotAllocator(
 
 			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
 
-			CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
+			final CompletableFuture<SlotProfile> slotProfileFuture = createSlotProfile(
+				schedulingRequirements,
+				Collections.emptySet(),
+				schedulingRequirements.getPhysicalSlotResourceProfile(),
+				allPreviousAllocationIds);
+
+			final CompletableFuture<LogicalSlot> slotFuture = slotProfileFuture.thenCompose(
+				slotProfile ->
+					slotProviderStrategy.allocateSlot(
+						slotRequestId,
+						new ScheduledUnit(
+							executionVertexId,
+							slotSharingGroupId,
+							schedulingRequirements.getCoLocationConstraint()),
+						slotProfile));
+
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
 					executionVertexId,
-					schedulingRequirements.getPreferredLocations(),
-					inputsLocationsRetriever,
-					Collections.emptySet()).thenCompose(
-							(Collection<TaskManagerLocation> preferredLocations) ->
-								slotProviderStrategy.allocateSlot(
-									slotRequestId,
-									new ScheduledUnit(
-										executionVertexId,
-										slotSharingGroupId,
-										schedulingRequirements.getCoLocationConstraint()),
-									SlotProfile.priorAllocation(
-										schedulingRequirements.getTaskResourceProfile(),
-										schedulingRequirements.getPhysicalSlotResourceProfile(),
-										preferredLocations,
-										Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
-										allPreviousAllocationIds)));
-
-			SlotExecutionVertexAssignment slotExecutionVertexAssignment =
-					new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
-			// add to map first to avoid the future completed before added.
-			pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
-
-			slotFuture.whenComplete(
-					(ignored, throwable) -> {
-						pendingSlotAssignments.remove(executionVertexId);
-						if (throwable != null) {
-							slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
-						}
-					});
+					slotFuture,
+					slotRequestId,
+					slotSharingGroupId);
 
 			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
 		}
 
 		return slotExecutionVertexAssignments;
 	}
 
-	private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+	protected void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
 		schedulingRequirements.stream()
 			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
 			.forEach(id -> checkState(
 				!pendingSlotAssignments.containsKey(id),
 				"BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
 	}
 
+	protected CompletableFuture<SlotProfile> createSlotProfile(

Review comment:
       True, there is no duplication at the moment.
   I meant to move the concern of `SlotProfileRetriever` (and all related code) into a separate component to reuse it in `OneSlotPerExecutionSlotAllocator/DefaultExecutionSlotAllocator`. This would simplify `OneSlotPerExecutionSlotAllocator/DefaultExecutionSlotAllocator`, imo, and there will be also no code duplication.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org