You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/11 15:06:17 UTC

[flink] 09/10: [FLINK-17017][runtime] Introduce BulkSlotProvider which allocates physical slots in bulks

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 044def9b50fe4249eaa3f1472981404425a11fd9
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jun 5 18:40:16 2020 +0800

    [FLINK-17017][runtime] Introduce BulkSlotProvider which allocates physical slots in bulks
---
 .../jobmaster/slotpool/BulkSlotProvider.java       |  60 ++++++
 .../jobmaster/slotpool/BulkSlotProviderImpl.java   | 213 ++++++++++++++++++++
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |  79 ++++++++
 .../slotpool/PhysicalSlotRequestBulk.java          |  79 ++++++++
 .../slotpool/PhysicalSlotRequestBulkChecker.java   | 148 ++++++++++++++
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |   4 +-
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   |   2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   5 +
 .../slotpool/BulkSlotProviderImplTest.java         | 214 +++++++++++++++++++++
 .../PhysicalSlotRequestBulkCheckerTest.java        | 209 ++++++++++++++++++++
 .../slotpool/PhysicalSlotRequestBulkTest.java      |  61 ++++++
 .../jobmaster/slotpool/TestingSlotPoolImpl.java    |   4 +
 12 files changed, 1074 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
new file mode 100644
index 0000000..cbcedf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The bulk slot provider serves physical slot requests.
+ */
+public interface BulkSlotProvider {
+
+	/**
+	 * Starts the slot provider by initializing the main thread executor.
+	 *
+	 * @param mainThreadExecutor the main thread executor of the job master
+	 */
+	void start(ComponentMainThreadExecutor mainThreadExecutor);
+
+	/**
+	 * Allocates a bulk of physical slots. The allocation will be completed
+	 * normally only when all the requests are fulfilled.
+	 *
+	 * @param physicalSlotRequests requests for physical slots
+	 * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable
+	 * @return future of the results of slot requests
+	 */
+	CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+		Collection<PhysicalSlotRequest> physicalSlotRequests,
+		Time timeout);
+
+	/**
+	 * Cancels the slot request with the given {@link SlotRequestId}. If the request is already fulfilled
+	 * with a physical slot, the slot will be released.
+	 *
+	 * @param slotRequestId identifying the slot request to cancel
+	 * @param cause of the cancellation
+	 */
+	void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
new file mode 100644
index 0000000..b380999
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link BulkSlotProvider}.
+ */
+class BulkSlotProviderImpl implements BulkSlotProvider {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BulkSlotProviderImpl.class);
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	private final SlotSelectionStrategy slotSelectionStrategy;
+
+	private final SlotPool slotPool;
+
+	private final PhysicalSlotRequestBulkChecker slotRequestBulkChecker;
+
+	BulkSlotProviderImpl(final SlotSelectionStrategy slotSelectionStrategy, final SlotPool slotPool) {
+		this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
+		this.slotPool = checkNotNull(slotPool);
+
+		this.slotRequestBulkChecker = new PhysicalSlotRequestBulkChecker(
+			this::getAllSlotInfos,
+			SystemClock.getInstance());
+
+		this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
+			"Scheduler is not initialized with proper main thread executor. " +
+				"Call to BulkSlotProvider.start(...) required.");
+	}
+
+	@Override
+	public void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+		this.componentMainThreadExecutor = mainThreadExecutor;
+
+		slotPool.disableBatchSlotRequestTimeoutCheck();
+	}
+
+	@Override
+	public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) {
+		componentMainThreadExecutor.assertRunningInMainThread();
+
+		slotPool.releaseSlot(slotRequestId, cause);
+	}
+
+	@Override
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+			final Collection<PhysicalSlotRequest> physicalSlotRequests,
+			final Time timeout) {
+
+		componentMainThreadExecutor.assertRunningInMainThread();
+
+		LOG.debug("Received {} slot requests.", physicalSlotRequests.size());
+
+		final PhysicalSlotRequestBulk slotRequestBulk =
+			slotRequestBulkChecker.createPhysicalSlotRequestBulk(physicalSlotRequests);
+
+		final List<CompletableFuture<PhysicalSlotRequest.Result>> resultFutures = new ArrayList<>(physicalSlotRequests.size());
+		for (PhysicalSlotRequest request : physicalSlotRequests) {
+			final CompletableFuture<PhysicalSlotRequest.Result> resultFuture =
+				allocatePhysicalSlot(request).thenApply(result -> {
+					slotRequestBulk.markRequestFulfilled(
+						result.getSlotRequestId(),
+						result.getPhysicalSlot().getAllocationId());
+
+					return result;
+				});
+			resultFutures.add(resultFuture);
+		}
+
+		schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+
+		return FutureUtils.combineAll(resultFutures);
+	}
+
+	private CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+			final PhysicalSlotRequest physicalSlotRequest) {
+
+		final SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		final SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
+		final ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile();
+
+		LOG.debug("Received slot request [{}] with resource requirements: {}", slotRequestId, resourceProfile);
+
+		final Optional<PhysicalSlot> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);
+
+		final CompletableFuture<PhysicalSlot> slotFuture;
+		if (availablePhysicalSlot.isPresent()) {
+			slotFuture = CompletableFuture.completedFuture(availablePhysicalSlot.get());
+		} else {
+			slotFuture = requestNewSlot(
+				slotRequestId,
+				resourceProfile,
+				physicalSlotRequest.willSlotBeOccupiedIndefinitely());
+		}
+
+		return slotFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
+	}
+
+	private Optional<PhysicalSlot> tryAllocateFromAvailable(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile) {
+
+		final Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
+			slotPool.getAvailableSlotsInformation()
+				.stream()
+				.map(SlotSelectionStrategy.SlotInfoAndResources::fromSingleSlot)
+				.collect(Collectors.toList());
+
+		final Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
+			slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
+
+		return selectedAvailableSlot.flatMap(
+			slotInfoAndLocality -> slotPool.allocateAvailableSlot(
+				slotRequestId,
+				slotInfoAndLocality.getSlotInfo().getAllocationId())
+		);
+	}
+
+	private CompletableFuture<PhysicalSlot> requestNewSlot(
+			final SlotRequestId slotRequestId,
+			final ResourceProfile resourceProfile,
+			final boolean willSlotBeOccupiedIndefinitely) {
+
+		if (willSlotBeOccupiedIndefinitely) {
+			return slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null);
+		} else {
+			return slotPool.requestNewAllocatedBatchSlot(slotRequestId, resourceProfile);
+		}
+	}
+
+	private void schedulePendingRequestBulkTimeoutCheck(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Time timeout) {
+
+		componentMainThreadExecutor.schedule(() -> {
+			final PhysicalSlotRequestBulkChecker.TimeoutCheckResult result =
+				slotRequestBulkChecker.checkPhysicalSlotRequestBulkTimeout(slotRequestBulk, timeout);
+
+			switch (result) {
+				case PENDING:
+					//re-schedule the timeout check
+					schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+					break;
+				case TIMEOUT:
+					timeoutSlotRequestBulk(slotRequestBulk);
+					break;
+				default: // no action to take
+			}
+		}, timeout.getSize(), timeout.getUnit());
+	}
+
+	private void timeoutSlotRequestBulk(final PhysicalSlotRequestBulk slotRequestBulk) {
+		final Exception cause = new TimeoutException("Slot request bulk is not fulfillable!");
+		// pending requests must be canceled first otherwise they might be fulfilled by
+		// allocated slots released from this bulk
+		for (SlotRequestId slotRequestId : slotRequestBulk.getPendingRequests().keySet()) {
+			cancelSlotRequest(slotRequestId, cause);
+		}
+		for (SlotRequestId slotRequestId : slotRequestBulk.getFulfilledRequests().keySet()) {
+			cancelSlotRequest(slotRequestId, cause);
+		}
+	}
+
+	private Set<SlotInfo> getAllSlotInfos() {
+		return Stream
+			.concat(
+				slotPool.getAvailableSlotsInformation().stream(),
+				slotPool.getAllocatedSlotsInformation().stream())
+			.collect(Collectors.toSet());
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
new file mode 100644
index 0000000..b953e43
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+/**
+ * Represents a request for a physical slot.
+ */
+public class PhysicalSlotRequest {
+
+	private final SlotRequestId slotRequestId;
+
+	private final SlotProfile slotProfile;
+
+	private final boolean slotWillBeOccupiedIndefinitely;
+
+	public PhysicalSlotRequest(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile,
+			final boolean slotWillBeOccupiedIndefinitely) {
+
+		this.slotRequestId = slotRequestId;
+		this.slotProfile = slotProfile;
+		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+	}
+
+	SlotRequestId getSlotRequestId() {
+		return slotRequestId;
+	}
+
+	SlotProfile getSlotProfile() {
+		return slotProfile;
+	}
+
+	boolean willSlotBeOccupiedIndefinitely() {
+		return slotWillBeOccupiedIndefinitely;
+	}
+
+	/**
+	 * Result of a {@link PhysicalSlotRequest}.
+	 */
+	public static class Result {
+
+		private final SlotRequestId slotRequestId;
+
+		private final PhysicalSlot physicalSlot;
+
+		Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
+			this.slotRequestId = slotRequestId;
+			this.physicalSlot = physicalSlot;
+		}
+
+		public SlotRequestId getSlotRequestId() {
+			return slotRequestId;
+		}
+
+		public PhysicalSlot getPhysicalSlot() {
+			return physicalSlot;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
new file mode 100644
index 0000000..0d63a3b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a bulk of physical slot requests.
+ */
+class PhysicalSlotRequestBulk {
+
+	private final Map<SlotRequestId, ResourceProfile> pendingRequests;
+
+	private final Map<SlotRequestId, AllocationID> fulfilledRequests = new HashMap<>();
+
+	private long unfulfillableTimestamp = Long.MAX_VALUE;
+
+	PhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
+		this.pendingRequests = physicalSlotRequests.stream()
+			.collect(Collectors.toMap(
+				PhysicalSlotRequest::getSlotRequestId,
+				r -> r.getSlotProfile().getPhysicalSlotResourceProfile()));
+	}
+
+	void markRequestFulfilled(final SlotRequestId slotRequestId, final AllocationID allocationID) {
+		pendingRequests.remove(slotRequestId);
+		fulfilledRequests.put(slotRequestId, allocationID);
+	}
+
+	Map<SlotRequestId, ResourceProfile> getPendingRequests() {
+		return Collections.unmodifiableMap(pendingRequests);
+	}
+
+	Map<SlotRequestId, AllocationID> getFulfilledRequests() {
+		return Collections.unmodifiableMap(fulfilledRequests);
+	}
+
+	void markFulfillable() {
+		unfulfillableTimestamp = Long.MAX_VALUE;
+	}
+
+	void markUnfulfillable(final long currentTimestamp) {
+		if (isFulfillable()) {
+			unfulfillableTimestamp = currentTimestamp;
+		}
+	}
+
+	long getUnfulfillableSince() {
+		return unfulfillableTimestamp;
+	}
+
+	private boolean isFulfillable() {
+		return unfulfillableTimestamp == Long.MAX_VALUE;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
new file mode 100644
index 0000000..98ac706
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class helps to check the status of physical slot request bulks.
+ */
+class PhysicalSlotRequestBulkChecker {
+
+	private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+	private final Clock clock;
+
+	PhysicalSlotRequestBulkChecker(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) {
+		this.slotsRetriever = checkNotNull(slotsRetriever);
+		this.clock = checkNotNull(clock);
+	}
+
+	PhysicalSlotRequestBulk createPhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
+		final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
+		slotRequestBulk.markUnfulfillable(clock.relativeTimeMillis());
+
+		return slotRequestBulk;
+	}
+
+	/**
+	 * Check the slot request bulk and timeout its requests if it has been unfulfillable for too long.
+	 * @param slotRequestBulk bulk of slot requests
+	 * @param slotRequestTimeout indicates how long a pending request can be unfulfillable
+	 * @return result of the check, indicating the bulk is fulfilled, still pending, or timed out
+	 */
+	TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Time slotRequestTimeout) {
+
+		if (slotRequestBulk.getPendingRequests().isEmpty()) {
+			return TimeoutCheckResult.FULFILLED;
+		}
+
+		final boolean fulfillable = isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+		if (fulfillable) {
+			slotRequestBulk.markFulfillable();
+		} else {
+			final long currentTimestamp = clock.relativeTimeMillis();
+
+			slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+			final long unfulfillableSince = slotRequestBulk.getUnfulfillableSince();
+			if (unfulfillableSince + slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+				return TimeoutCheckResult.TIMEOUT;
+			}
+		}
+
+		return TimeoutCheckResult.PENDING;
+	}
+
+	/**
+	 * Returns whether the given bulk of slot requests are possible to be fulfilled at the same time
+	 * with all the reusable slots in the slot pool. A reusable slot means the slot is available or
+	 * will not be occupied indefinitely.
+	 *
+	 * @param slotRequestBulk bulk of slot requests to check
+	 * @param slotsRetriever supplies slots to be used for the fulfill-ability check
+	 * @return true if the slot requests are possible to be fulfilled, otherwise false
+	 */
+	@VisibleForTesting
+	static boolean isSlotRequestBulkFulfillable(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+		final Set<AllocationID> assignedSlots = new HashSet<>(slotRequestBulk.getFulfilledRequests().values());
+		final Set<SlotInfo> reusableSlots = getReusableSlots(slotsRetriever, assignedSlots);
+		return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(), reusableSlots);
+	}
+
+	private static Set<SlotInfo> getReusableSlots(
+			final Supplier<Set<SlotInfo>> slotsRetriever,
+			final Set<AllocationID> slotsToExclude) {
+
+		return slotsRetriever.get().stream()
+			.filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely())
+			.filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId()))
+			.collect(Collectors.toSet());
+	}
+
+	private static boolean areRequestsFulfillableWithSlots(
+			final Collection<ResourceProfile> requestResourceProfiles,
+			final Set<SlotInfo> slots) {
+
+		final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+		for (ResourceProfile requestResourceProfile : requestResourceProfiles) {
+			final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+			if (matchedSlot.isPresent()) {
+				remainingSlots.remove(matchedSlot.get());
+			} else {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static Optional<SlotInfo> findMatchingSlotForRequest(
+			final ResourceProfile requestResourceProfile,
+			final Collection<SlotInfo> slots) {
+
+		return slots.stream().filter(slot -> slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+	}
+
+	enum TimeoutCheckResult {
+		PENDING,
+
+		FULFILLED,
+
+		TIMEOUT
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 4392e70..89932c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -191,9 +191,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
 	 * Disables batch slot request timeout check. Invoked when someone else wants to
 	 * take over the timeout check responsibility.
 	 */
-	default void disableBatchSlotRequestTimeoutCheck() {
-		throw new UnsupportedOperationException("Not properly implemented.");
-	}
+	void disableBatchSlotRequestTimeoutCheck();
 
 	/**
 	 * Create report about the allocated slots belonging to the specified task manager.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index d9407f5..d305585 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -134,7 +134,7 @@ public class SlotPoolImpl implements SlotPool {
 
 	private ComponentMainThreadExecutor componentMainThreadExecutor;
 
-	private boolean batchSlotRequestTimeoutCheckEnabled;
+	protected boolean batchSlotRequestTimeoutCheckEnabled;
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 24f0f20..ce1716b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -610,6 +610,11 @@ public class JobMasterTest extends TestLogger {
 		}
 
 		@Override
+		public void disableBatchSlotRequestTimeoutCheck() {
+			// no action and no exception is expected
+		}
+
+		@Override
 		public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
 			final Collection<SlotInfo> slotInfos = registeredSlots.getOrDefault(taskManagerId, Collections.emptyList());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
new file mode 100644
index 0000000..8d4d0ad
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BulkSlotProviderImpl}.
+ */
+public class BulkSlotProviderImplTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.milliseconds(1000L);
+
+	private static ScheduledExecutorService singleThreadScheduledExecutorService;
+
+	private static ComponentMainThreadExecutor mainThreadExecutor;
+
+	private TestingSlotPoolImpl slotPool;
+
+	private BulkSlotProviderImpl bulkSlotProvider;
+
+	private ManualClock clock;
+
+	@BeforeClass
+	public static void setupClass() {
+		singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+		mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
+	}
+
+	@AfterClass
+	public static void teardownClass() {
+		if (singleThreadScheduledExecutorService != null) {
+			singleThreadScheduledExecutorService.shutdownNow();
+		}
+	}
+
+	@Before
+	public void setup() throws Exception {
+		clock = new ManualClock();
+
+		slotPool = new SlotPoolBuilder(mainThreadExecutor).build();
+
+		bulkSlotProvider = new BulkSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+		bulkSlotProvider.start(mainThreadExecutor);
+	}
+
+	@After
+	public void teardown() {
+		CompletableFuture.runAsync(() -> slotPool.close(), mainThreadExecutor).join();
+	}
+
+	@Test
+	public void testBulkSlotAllocationFulfilledWithAvailableSlots() throws Exception {
+		final PhysicalSlotRequest request1 = createPhysicalSlotRequest();
+		final PhysicalSlotRequest request2 = createPhysicalSlotRequest();
+		final List<PhysicalSlotRequest> requests = Arrays.asList(request1, request2);
+
+		addSlotToSlotPool();
+		addSlotToSlotPool();
+
+		final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+		final Collection<PhysicalSlotRequest.Result> results = slotFutures.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+		final Collection<SlotRequestId> resultRequestIds = results.stream()
+			.map(PhysicalSlotRequest.Result::getSlotRequestId)
+			.collect(Collectors.toList());
+
+		assertThat(resultRequestIds, containsInAnyOrder(request1.getSlotRequestId(), request2.getSlotRequestId()));
+	}
+
+	@Test
+	public void testBulkSlotAllocationFulfilledWithNewSlots() {
+		final List<PhysicalSlotRequest> requests = Arrays.asList(
+			createPhysicalSlotRequest(),
+			createPhysicalSlotRequest());
+		final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+		addSlotToSlotPool();
+
+		assertThat(slotFutures.isDone(), is(false));
+
+		addSlotToSlotPool();
+
+		assertThat(slotFutures.isDone(), is(true));
+		assertThat(slotFutures.isCompletedExceptionally(), is(false));
+	}
+
+	@Test
+	public void testBulkSlotAllocationTimeoutsIfUnfulfillable() {
+		final Exception exception = allocateSlotsAndWaitForTimeout();
+
+		final Optional<Throwable> cause = ExceptionUtils.findThrowableWithMessage(
+			exception,
+			"Slot request bulk is not fulfillable!");
+		assertThat(cause.isPresent(), is(true));
+		assertThat(cause.get(), instanceOf(TimeoutException.class));
+	}
+
+	@Test
+	public void testFailedBulkSlotAllocationReleasesAllocatedSlot() {
+		allocateSlotsAndWaitForTimeout();
+
+		assertThat(slotPool.getAllocatedSlots().listSlotInfo(), hasSize(0));
+	}
+
+	@Test
+	public void testFailedBulkSlotAllocationClearsPendingRequests() {
+		allocateSlotsAndWaitForTimeout();
+
+		assertThat(slotPool.getPendingRequests().values(), hasSize(0));
+	}
+
+	private Exception allocateSlotsAndWaitForTimeout() {
+		final List<PhysicalSlotRequest> requests = Arrays.asList(
+			createPhysicalSlotRequest(),
+			createPhysicalSlotRequest());
+		final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+		addSlotToSlotPool();
+
+		assertThat(slotPool.getAllocatedSlots().listSlotInfo(), hasSize(1));
+
+		clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
+
+		try {
+			// wait util the requests timed out
+			slotFutures.get();
+
+			fail("Expected that the slot futures time out.");
+			return new Exception("Unexpected");
+		} catch (Exception e) {
+			// expected
+			return e;
+		}
+	}
+
+	@Test
+	public void testIndividualBatchSlotRequestTimeoutCheckIsDisabled() {
+		assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
+	}
+
+	private void addSlotToSlotPool() {
+		SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY));
+	}
+
+	private CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocateSlots(
+			final Collection<PhysicalSlotRequest> requests) {
+
+		return CompletableFuture
+			.supplyAsync(
+				() -> bulkSlotProvider.allocatePhysicalSlots(
+					requests,
+					TIMEOUT),
+				mainThreadExecutor)
+			.thenCompose(Function.identity());
+	}
+
+	private static PhysicalSlotRequest createPhysicalSlotRequest() {
+		return new PhysicalSlotRequest(
+			new SlotRequestId(),
+			SlotProfile.noLocality(ResourceProfile.UNKNOWN),
+			true);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
new file mode 100644
index 0000000..557ba54
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkChecker}.
+ */
+public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.milliseconds(5000L);
+
+	private final ManualClock clock = new ManualClock();
+
+	private PhysicalSlotRequestBulkChecker bulkChecker;
+
+	private Set<PhysicalSlot> slots;
+
+	private Supplier<Set<SlotInfo>> slotsRetriever;
+
+	@Before
+	public void setup() throws Exception {
+		slots = new HashSet<>();
+		slotsRetriever = () -> new HashSet<>(slots);
+		bulkChecker = new PhysicalSlotRequestBulkChecker(slotsRetriever, clock);
+	}
+
+	@Test
+	public void testCreateBulk() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(Collections.emptyList());
+
+		assertThat(bulk.getUnfulfillableSince(), is(clock.relativeTimeMillis()));
+	}
+
+	@Test
+	public void testBulkFulfilledOnCheck() {
+		final PhysicalSlotRequest request = createPhysicalSlotRequest();
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(request));
+
+		bulk.markRequestFulfilled(request.getSlotRequestId(), new AllocationID());
+
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.FULFILLED));
+	}
+
+	@Test
+	public void testBulkTimeoutOnCheck() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest()));
+
+		clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.TIMEOUT));
+	}
+
+	@Test
+	public void testBulkPendingOnCheckIfFulfillable() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot = addOneSlot();
+		occupySlot(slot, false);
+
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
+	}
+
+	@Test
+	public void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(createPhysicalSlotRequest()));
+
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
+	}
+
+	@Test
+	public void testBulkFulfillable() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(createPhysicalSlotRequest()));
+
+		addOneSlot();
+
+		assertThat(isFulfillable(bulk), is(true));
+	}
+
+	@Test
+	public void testBulkUnfulfillableWithInsufficientSlots() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+		addOneSlot();
+
+		assertThat(isFulfillable(bulk), is(false));
+	}
+
+	@Test
+	public void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
+		final PhysicalSlotRequest request = createPhysicalSlotRequest();
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(request, createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot = addOneSlot();
+
+		bulk.markRequestFulfilled(request.getSlotRequestId(), slot.getAllocationId());
+
+		assertThat(isFulfillable(bulk), is(false));
+	}
+
+	@Test
+	public void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot1 = addOneSlot();
+		addOneSlot();
+
+		occupySlot(slot1, true);
+
+		assertThat(isFulfillable(bulk), is(false));
+	}
+
+	@Test
+	public void testBulkFulfillableWithSlotOccupiedTemporarily() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot1 = addOneSlot();
+		addOneSlot();
+
+		occupySlot(slot1, false);
+
+		assertThat(isFulfillable(bulk), is(true));
+	}
+
+	private static PhysicalSlotRequest createPhysicalSlotRequest() {
+		return new PhysicalSlotRequest(
+			new SlotRequestId(),
+			SlotProfile.noLocality(ResourceProfile.UNKNOWN),
+			true);
+	}
+
+	private static void occupySlot(final PhysicalSlot slotToOccupy, final boolean slotWillBeOccupiedIndefinitely) {
+		SingleLogicalSlot.allocateFromPhysicalSlot(
+			new SlotRequestId(),
+			slotToOccupy,
+			Locality.UNKNOWN,
+			new TestingSlotOwner(),
+			slotWillBeOccupiedIndefinitely);
+	}
+
+	private PhysicalSlot addOneSlot() {
+		final PhysicalSlot slot = new AllocatedSlot(
+			new AllocationID(),
+			new LocalTaskManagerLocation(),
+			0,
+			ResourceProfile.ANY,
+			new SimpleAckingTaskManagerGateway());
+		slots.add(slot);
+
+		return slot;
+	}
+
+	private PhysicalSlotRequestBulkChecker.TimeoutCheckResult checkBulkTimeout(final PhysicalSlotRequestBulk bulk) {
+		return bulkChecker.checkPhysicalSlotRequestBulkTimeout(bulk, TIMEOUT);
+	}
+
+	private boolean isFulfillable(final PhysicalSlotRequestBulk bulk) {
+		return PhysicalSlotRequestBulkChecker.isSlotRequestBulkFulfillable(bulk, slotsRetriever);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java
new file mode 100644
index 0000000..cf6c556
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulk}.
+ */
+public class PhysicalSlotRequestBulkTest extends TestLogger {
+
+	private final ManualClock clock = new ManualClock();
+
+	@Test
+	public void testMarkBulkUnfulfillable() {
+		final PhysicalSlotRequestBulk bulk = new PhysicalSlotRequestBulk(Collections.emptyList());
+
+		clock.advanceTime(456, TimeUnit.MILLISECONDS);
+		bulk.markUnfulfillable(clock.relativeTimeMillis());
+
+		assertThat(bulk.getUnfulfillableSince(), is(clock.relativeTimeMillis()));
+	}
+
+	@Test
+	public void testUnfulfillableTimestampWillNotBeOverriddenByFollowingUnfulfillableTimestamp() {
+		final PhysicalSlotRequestBulk bulk = new PhysicalSlotRequestBulk(Collections.emptyList());
+
+		final long unfulfillableSince = clock.relativeTimeMillis();
+		bulk.markUnfulfillable(unfulfillableSince);
+
+		clock.advanceTime(456, TimeUnit.MILLISECONDS);
+		bulk.markUnfulfillable(clock.relativeTimeMillis());
+
+		assertThat(bulk.getUnfulfillableSince(), is(unfulfillableSince));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
index 776b51e..5006c8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
@@ -65,6 +65,10 @@ public class TestingSlotPoolImpl extends SlotPoolImpl {
 		runAsync(this::checkBatchSlotTimeout);
 	}
 
+	boolean isBatchSlotRequestTimeoutCheckEnabled() {
+		return batchSlotRequestTimeoutCheckEnabled;
+	}
+
 	@Override
 	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
 			final SlotRequestId slotRequestId,