You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/11 15:06:17 UTC
[flink] 09/10: [FLINK-17017][runtime] Introduce BulkSlotProvider
which allocates physical slots in bulks
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 044def9b50fe4249eaa3f1472981404425a11fd9
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jun 5 18:40:16 2020 +0800
[FLINK-17017][runtime] Introduce BulkSlotProvider which allocates physical slots in bulks
---
.../jobmaster/slotpool/BulkSlotProvider.java | 60 ++++++
.../jobmaster/slotpool/BulkSlotProviderImpl.java | 213 ++++++++++++++++++++
.../jobmaster/slotpool/PhysicalSlotRequest.java | 79 ++++++++
.../slotpool/PhysicalSlotRequestBulk.java | 79 ++++++++
.../slotpool/PhysicalSlotRequestBulkChecker.java | 148 ++++++++++++++
.../flink/runtime/jobmaster/slotpool/SlotPool.java | 4 +-
.../runtime/jobmaster/slotpool/SlotPoolImpl.java | 2 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 5 +
.../slotpool/BulkSlotProviderImplTest.java | 214 +++++++++++++++++++++
.../PhysicalSlotRequestBulkCheckerTest.java | 209 ++++++++++++++++++++
.../slotpool/PhysicalSlotRequestBulkTest.java | 61 ++++++
.../jobmaster/slotpool/TestingSlotPoolImpl.java | 4 +
12 files changed, 1074 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
new file mode 100644
index 0000000..cbcedf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The bulk slot provider serves physical slot requests.
+ */
+public interface BulkSlotProvider {
+
+ /**
+ * Starts the slot provider by initializing the main thread executor.
+ *
+ * @param mainThreadExecutor the main thread executor of the job master
+ */
+ void start(ComponentMainThreadExecutor mainThreadExecutor);
+
+ /**
+ * Allocates a bulk of physical slots. The allocation will be completed
+ * normally only when all the requests are fulfilled.
+ *
+ * @param physicalSlotRequests requests for physical slots
+ * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable
+ * @return future of the results of slot requests
+ */
+ CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+ Collection<PhysicalSlotRequest> physicalSlotRequests,
+ Time timeout);
+
+ /**
+ * Cancels the slot request with the given {@link SlotRequestId}. If the request is already fulfilled
+ * with a physical slot, the slot will be released.
+ *
+ * @param slotRequestId identifying the slot request to cancel
+ * @param cause of the cancellation
+ */
+ void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
new file mode 100644
index 0000000..b380999
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link BulkSlotProvider}.
+ */
+class BulkSlotProviderImpl implements BulkSlotProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BulkSlotProviderImpl.class);
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ private final SlotSelectionStrategy slotSelectionStrategy;
+
+ private final SlotPool slotPool;
+
+ private final PhysicalSlotRequestBulkChecker slotRequestBulkChecker;
+
+ BulkSlotProviderImpl(final SlotSelectionStrategy slotSelectionStrategy, final SlotPool slotPool) {
+ this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
+ this.slotPool = checkNotNull(slotPool);
+
+ this.slotRequestBulkChecker = new PhysicalSlotRequestBulkChecker(
+ this::getAllSlotInfos,
+ SystemClock.getInstance());
+
+ this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
+ "Scheduler is not initialized with proper main thread executor. " +
+ "Call to BulkSlotProvider.start(...) required.");
+ }
+
+ @Override
+ public void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+ this.componentMainThreadExecutor = mainThreadExecutor;
+
+ slotPool.disableBatchSlotRequestTimeoutCheck();
+ }
+
+ @Override
+ public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) {
+ componentMainThreadExecutor.assertRunningInMainThread();
+
+ slotPool.releaseSlot(slotRequestId, cause);
+ }
+
+ @Override
+ public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+ final Collection<PhysicalSlotRequest> physicalSlotRequests,
+ final Time timeout) {
+
+ componentMainThreadExecutor.assertRunningInMainThread();
+
+ LOG.debug("Received {} slot requests.", physicalSlotRequests.size());
+
+ final PhysicalSlotRequestBulk slotRequestBulk =
+ slotRequestBulkChecker.createPhysicalSlotRequestBulk(physicalSlotRequests);
+
+ final List<CompletableFuture<PhysicalSlotRequest.Result>> resultFutures = new ArrayList<>(physicalSlotRequests.size());
+ for (PhysicalSlotRequest request : physicalSlotRequests) {
+ final CompletableFuture<PhysicalSlotRequest.Result> resultFuture =
+ allocatePhysicalSlot(request).thenApply(result -> {
+ slotRequestBulk.markRequestFulfilled(
+ result.getSlotRequestId(),
+ result.getPhysicalSlot().getAllocationId());
+
+ return result;
+ });
+ resultFutures.add(resultFuture);
+ }
+
+ schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+
+ return FutureUtils.combineAll(resultFutures);
+ }
+
+ private CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+ final PhysicalSlotRequest physicalSlotRequest) {
+
+ final SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+ final SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
+ final ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile();
+
+ LOG.debug("Received slot request [{}] with resource requirements: {}", slotRequestId, resourceProfile);
+
+ final Optional<PhysicalSlot> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);
+
+ final CompletableFuture<PhysicalSlot> slotFuture;
+ if (availablePhysicalSlot.isPresent()) {
+ slotFuture = CompletableFuture.completedFuture(availablePhysicalSlot.get());
+ } else {
+ slotFuture = requestNewSlot(
+ slotRequestId,
+ resourceProfile,
+ physicalSlotRequest.willSlotBeOccupiedIndefinitely());
+ }
+
+ return slotFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
+ }
+
+ private Optional<PhysicalSlot> tryAllocateFromAvailable(
+ final SlotRequestId slotRequestId,
+ final SlotProfile slotProfile) {
+
+ final Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
+ slotPool.getAvailableSlotsInformation()
+ .stream()
+ .map(SlotSelectionStrategy.SlotInfoAndResources::fromSingleSlot)
+ .collect(Collectors.toList());
+
+ final Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
+ slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
+
+ return selectedAvailableSlot.flatMap(
+ slotInfoAndLocality -> slotPool.allocateAvailableSlot(
+ slotRequestId,
+ slotInfoAndLocality.getSlotInfo().getAllocationId())
+ );
+ }
+
+ private CompletableFuture<PhysicalSlot> requestNewSlot(
+ final SlotRequestId slotRequestId,
+ final ResourceProfile resourceProfile,
+ final boolean willSlotBeOccupiedIndefinitely) {
+
+ if (willSlotBeOccupiedIndefinitely) {
+ return slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null);
+ } else {
+ return slotPool.requestNewAllocatedBatchSlot(slotRequestId, resourceProfile);
+ }
+ }
+
+ private void schedulePendingRequestBulkTimeoutCheck(
+ final PhysicalSlotRequestBulk slotRequestBulk,
+ final Time timeout) {
+
+ componentMainThreadExecutor.schedule(() -> {
+ final PhysicalSlotRequestBulkChecker.TimeoutCheckResult result =
+ slotRequestBulkChecker.checkPhysicalSlotRequestBulkTimeout(slotRequestBulk, timeout);
+
+ switch (result) {
+ case PENDING:
+ //re-schedule the timeout check
+ schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+ break;
+ case TIMEOUT:
+ timeoutSlotRequestBulk(slotRequestBulk);
+ break;
+ default: // no action to take
+ }
+ }, timeout.getSize(), timeout.getUnit());
+ }
+
+ private void timeoutSlotRequestBulk(final PhysicalSlotRequestBulk slotRequestBulk) {
+ final Exception cause = new TimeoutException("Slot request bulk is not fulfillable!");
+ // pending requests must be canceled first otherwise they might be fulfilled by
+ // allocated slots released from this bulk
+ for (SlotRequestId slotRequestId : slotRequestBulk.getPendingRequests().keySet()) {
+ cancelSlotRequest(slotRequestId, cause);
+ }
+ for (SlotRequestId slotRequestId : slotRequestBulk.getFulfilledRequests().keySet()) {
+ cancelSlotRequest(slotRequestId, cause);
+ }
+ }
+
+ private Set<SlotInfo> getAllSlotInfos() {
+ return Stream
+ .concat(
+ slotPool.getAvailableSlotsInformation().stream(),
+ slotPool.getAllocatedSlotsInformation().stream())
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
new file mode 100644
index 0000000..b953e43
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+/**
+ * Represents a request for a physical slot.
+ */
+public class PhysicalSlotRequest {
+
+ private final SlotRequestId slotRequestId;
+
+ private final SlotProfile slotProfile;
+
+ private final boolean slotWillBeOccupiedIndefinitely;
+
+ public PhysicalSlotRequest(
+ final SlotRequestId slotRequestId,
+ final SlotProfile slotProfile,
+ final boolean slotWillBeOccupiedIndefinitely) {
+
+ this.slotRequestId = slotRequestId;
+ this.slotProfile = slotProfile;
+ this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+ }
+
+ SlotRequestId getSlotRequestId() {
+ return slotRequestId;
+ }
+
+ SlotProfile getSlotProfile() {
+ return slotProfile;
+ }
+
+ boolean willSlotBeOccupiedIndefinitely() {
+ return slotWillBeOccupiedIndefinitely;
+ }
+
+ /**
+ * Result of a {@link PhysicalSlotRequest}.
+ */
+ public static class Result {
+
+ private final SlotRequestId slotRequestId;
+
+ private final PhysicalSlot physicalSlot;
+
+ Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
+ this.slotRequestId = slotRequestId;
+ this.physicalSlot = physicalSlot;
+ }
+
+ public SlotRequestId getSlotRequestId() {
+ return slotRequestId;
+ }
+
+ public PhysicalSlot getPhysicalSlot() {
+ return physicalSlot;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
new file mode 100644
index 0000000..0d63a3b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a bulk of physical slot requests.
+ */
+class PhysicalSlotRequestBulk {
+
+ private final Map<SlotRequestId, ResourceProfile> pendingRequests;
+
+ private final Map<SlotRequestId, AllocationID> fulfilledRequests = new HashMap<>();
+
+ private long unfulfillableTimestamp = Long.MAX_VALUE;
+
+ PhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
+ this.pendingRequests = physicalSlotRequests.stream()
+ .collect(Collectors.toMap(
+ PhysicalSlotRequest::getSlotRequestId,
+ r -> r.getSlotProfile().getPhysicalSlotResourceProfile()));
+ }
+
+ void markRequestFulfilled(final SlotRequestId slotRequestId, final AllocationID allocationID) {
+ pendingRequests.remove(slotRequestId);
+ fulfilledRequests.put(slotRequestId, allocationID);
+ }
+
+ Map<SlotRequestId, ResourceProfile> getPendingRequests() {
+ return Collections.unmodifiableMap(pendingRequests);
+ }
+
+ Map<SlotRequestId, AllocationID> getFulfilledRequests() {
+ return Collections.unmodifiableMap(fulfilledRequests);
+ }
+
+ void markFulfillable() {
+ unfulfillableTimestamp = Long.MAX_VALUE;
+ }
+
+ void markUnfulfillable(final long currentTimestamp) {
+ if (isFulfillable()) {
+ unfulfillableTimestamp = currentTimestamp;
+ }
+ }
+
+ long getUnfulfillableSince() {
+ return unfulfillableTimestamp;
+ }
+
+ private boolean isFulfillable() {
+ return unfulfillableTimestamp == Long.MAX_VALUE;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
new file mode 100644
index 0000000..98ac706
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class helps to check the status of physical slot request bulks.
+ */
+class PhysicalSlotRequestBulkChecker {
+
+ private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+ private final Clock clock;
+
+ PhysicalSlotRequestBulkChecker(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) {
+ this.slotsRetriever = checkNotNull(slotsRetriever);
+ this.clock = checkNotNull(clock);
+ }
+
+ PhysicalSlotRequestBulk createPhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
+ final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
+ slotRequestBulk.markUnfulfillable(clock.relativeTimeMillis());
+
+ return slotRequestBulk;
+ }
+
+ /**
+ * Check the slot request bulk and timeout its requests if it has been unfulfillable for too long.
+ * @param slotRequestBulk bulk of slot requests
+ * @param slotRequestTimeout indicates how long a pending request can be unfulfillable
+ * @return result of the check, indicating the bulk is fulfilled, still pending, or timed out
+ */
+ TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+ final PhysicalSlotRequestBulk slotRequestBulk,
+ final Time slotRequestTimeout) {
+
+ if (slotRequestBulk.getPendingRequests().isEmpty()) {
+ return TimeoutCheckResult.FULFILLED;
+ }
+
+ final boolean fulfillable = isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+ if (fulfillable) {
+ slotRequestBulk.markFulfillable();
+ } else {
+ final long currentTimestamp = clock.relativeTimeMillis();
+
+ slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+ final long unfulfillableSince = slotRequestBulk.getUnfulfillableSince();
+ if (unfulfillableSince + slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+ return TimeoutCheckResult.TIMEOUT;
+ }
+ }
+
+ return TimeoutCheckResult.PENDING;
+ }
+
+ /**
+ * Returns whether the given bulk of slot requests are possible to be fulfilled at the same time
+ * with all the reusable slots in the slot pool. A reusable slot means the slot is available or
+ * will not be occupied indefinitely.
+ *
+ * @param slotRequestBulk bulk of slot requests to check
+ * @param slotsRetriever supplies slots to be used for the fulfill-ability check
+ * @return true if the slot requests are possible to be fulfilled, otherwise false
+ */
+ @VisibleForTesting
+ static boolean isSlotRequestBulkFulfillable(
+ final PhysicalSlotRequestBulk slotRequestBulk,
+ final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+ final Set<AllocationID> assignedSlots = new HashSet<>(slotRequestBulk.getFulfilledRequests().values());
+ final Set<SlotInfo> reusableSlots = getReusableSlots(slotsRetriever, assignedSlots);
+ return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(), reusableSlots);
+ }
+
+ private static Set<SlotInfo> getReusableSlots(
+ final Supplier<Set<SlotInfo>> slotsRetriever,
+ final Set<AllocationID> slotsToExclude) {
+
+ return slotsRetriever.get().stream()
+ .filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely())
+ .filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId()))
+ .collect(Collectors.toSet());
+ }
+
+ private static boolean areRequestsFulfillableWithSlots(
+ final Collection<ResourceProfile> requestResourceProfiles,
+ final Set<SlotInfo> slots) {
+
+ final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+ for (ResourceProfile requestResourceProfile : requestResourceProfiles) {
+ final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+ if (matchedSlot.isPresent()) {
+ remainingSlots.remove(matchedSlot.get());
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Optional<SlotInfo> findMatchingSlotForRequest(
+ final ResourceProfile requestResourceProfile,
+ final Collection<SlotInfo> slots) {
+
+ return slots.stream().filter(slot -> slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+ }
+
+ enum TimeoutCheckResult {
+ PENDING,
+
+ FULFILLED,
+
+ TIMEOUT
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 4392e70..89932c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -191,9 +191,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
* Disables batch slot request timeout check. Invoked when someone else wants to
* take over the timeout check responsibility.
*/
- default void disableBatchSlotRequestTimeoutCheck() {
- throw new UnsupportedOperationException("Not properly implemented.");
- }
+ void disableBatchSlotRequestTimeoutCheck();
/**
* Create report about the allocated slots belonging to the specified task manager.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index d9407f5..d305585 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -134,7 +134,7 @@ public class SlotPoolImpl implements SlotPool {
private ComponentMainThreadExecutor componentMainThreadExecutor;
- private boolean batchSlotRequestTimeoutCheckEnabled;
+ protected boolean batchSlotRequestTimeoutCheckEnabled;
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 24f0f20..ce1716b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -610,6 +610,11 @@ public class JobMasterTest extends TestLogger {
}
@Override
+ public void disableBatchSlotRequestTimeoutCheck() {
+ // no action and no exception is expected
+ }
+
+ @Override
public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
final Collection<SlotInfo> slotInfos = registeredSlots.getOrDefault(taskManagerId, Collections.emptyList());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
new file mode 100644
index 0000000..8d4d0ad
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BulkSlotProviderImpl}.
+ */
+public class BulkSlotProviderImplTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.milliseconds(1000L);
+
+ private static ScheduledExecutorService singleThreadScheduledExecutorService;
+
+ private static ComponentMainThreadExecutor mainThreadExecutor;
+
+ private TestingSlotPoolImpl slotPool;
+
+ private BulkSlotProviderImpl bulkSlotProvider;
+
+ private ManualClock clock;
+
+ @BeforeClass
+ public static void setupClass() {
+ singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ if (singleThreadScheduledExecutorService != null) {
+ singleThreadScheduledExecutorService.shutdownNow();
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ clock = new ManualClock();
+
+ slotPool = new SlotPoolBuilder(mainThreadExecutor).build();
+
+ bulkSlotProvider = new BulkSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+ bulkSlotProvider.start(mainThreadExecutor);
+ }
+
+ @After
+ public void teardown() {
+ CompletableFuture.runAsync(() -> slotPool.close(), mainThreadExecutor).join();
+ }
+
+ @Test
+ public void testBulkSlotAllocationFulfilledWithAvailableSlots() throws Exception {
+ final PhysicalSlotRequest request1 = createPhysicalSlotRequest();
+ final PhysicalSlotRequest request2 = createPhysicalSlotRequest();
+ final List<PhysicalSlotRequest> requests = Arrays.asList(request1, request2);
+
+ addSlotToSlotPool();
+ addSlotToSlotPool();
+
+ final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+ final Collection<PhysicalSlotRequest.Result> results = slotFutures.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+ final Collection<SlotRequestId> resultRequestIds = results.stream()
+ .map(PhysicalSlotRequest.Result::getSlotRequestId)
+ .collect(Collectors.toList());
+
+ assertThat(resultRequestIds, containsInAnyOrder(request1.getSlotRequestId(), request2.getSlotRequestId()));
+ }
+
+ @Test
+ public void testBulkSlotAllocationFulfilledWithNewSlots() {
+ final List<PhysicalSlotRequest> requests = Arrays.asList(
+ createPhysicalSlotRequest(),
+ createPhysicalSlotRequest());
+ final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+ addSlotToSlotPool();
+
+ assertThat(slotFutures.isDone(), is(false));
+
+ addSlotToSlotPool();
+
+ assertThat(slotFutures.isDone(), is(true));
+ assertThat(slotFutures.isCompletedExceptionally(), is(false));
+ }
+
+ @Test
+ public void testBulkSlotAllocationTimeoutsIfUnfulfillable() {
+ final Exception exception = allocateSlotsAndWaitForTimeout();
+
+ final Optional<Throwable> cause = ExceptionUtils.findThrowableWithMessage(
+ exception,
+ "Slot request bulk is not fulfillable!");
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get(), instanceOf(TimeoutException.class));
+ }
+
+ @Test
+ public void testFailedBulkSlotAllocationReleasesAllocatedSlot() {
+ allocateSlotsAndWaitForTimeout();
+
+ assertThat(slotPool.getAllocatedSlots().listSlotInfo(), hasSize(0));
+ }
+
+ @Test
+ public void testFailedBulkSlotAllocationClearsPendingRequests() {
+ allocateSlotsAndWaitForTimeout();
+
+ assertThat(slotPool.getPendingRequests().values(), hasSize(0));
+ }
+
+ private Exception allocateSlotsAndWaitForTimeout() {
+ final List<PhysicalSlotRequest> requests = Arrays.asList(
+ createPhysicalSlotRequest(),
+ createPhysicalSlotRequest());
+ final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+ addSlotToSlotPool();
+
+ assertThat(slotPool.getAllocatedSlots().listSlotInfo(), hasSize(1));
+
+ clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
+
+ try {
+ // wait util the requests timed out
+ slotFutures.get();
+
+ fail("Expected that the slot futures time out.");
+ return new Exception("Unexpected");
+ } catch (Exception e) {
+ // expected
+ return e;
+ }
+ }
+
+ @Test
+ public void testIndividualBatchSlotRequestTimeoutCheckIsDisabled() {
+ assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
+ }
+
+ private void addSlotToSlotPool() {
+ SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY));
+ }
+
+ private CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocateSlots(
+ final Collection<PhysicalSlotRequest> requests) {
+
+ return CompletableFuture
+ .supplyAsync(
+ () -> bulkSlotProvider.allocatePhysicalSlots(
+ requests,
+ TIMEOUT),
+ mainThreadExecutor)
+ .thenCompose(Function.identity());
+ }
+
+ private static PhysicalSlotRequest createPhysicalSlotRequest() {
+ return new PhysicalSlotRequest(
+ new SlotRequestId(),
+ SlotProfile.noLocality(ResourceProfile.UNKNOWN),
+ true);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
new file mode 100644
index 0000000..557ba54
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkChecker}.
+ */
+public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.milliseconds(5000L);
+
+ private final ManualClock clock = new ManualClock();
+
+ private PhysicalSlotRequestBulkChecker bulkChecker;
+
+ private Set<PhysicalSlot> slots;
+
+ private Supplier<Set<SlotInfo>> slotsRetriever;
+
+ @Before
+ public void setup() throws Exception {
+ slots = new HashSet<>();
+ slotsRetriever = () -> new HashSet<>(slots);
+ bulkChecker = new PhysicalSlotRequestBulkChecker(slotsRetriever, clock);
+ }
+
+ @Test
+ public void testCreateBulk() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(Collections.emptyList());
+
+ assertThat(bulk.getUnfulfillableSince(), is(clock.relativeTimeMillis()));
+ }
+
+ @Test
+ public void testBulkFulfilledOnCheck() {
+ final PhysicalSlotRequest request = createPhysicalSlotRequest();
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Collections.singletonList(request));
+
+ bulk.markRequestFulfilled(request.getSlotRequestId(), new AllocationID());
+
+ assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.FULFILLED));
+ }
+
+ @Test
+ public void testBulkTimeoutOnCheck() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Arrays.asList(createPhysicalSlotRequest()));
+
+ clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
+ assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.TIMEOUT));
+ }
+
+ @Test
+ public void testBulkPendingOnCheckIfFulfillable() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Collections.singletonList(createPhysicalSlotRequest()));
+
+ final PhysicalSlot slot = addOneSlot();
+ occupySlot(slot, false);
+
+ assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
+ }
+
+ @Test
+ public void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Collections.singletonList(createPhysicalSlotRequest()));
+
+ assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
+ }
+
+ @Test
+ public void testBulkFulfillable() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Collections.singletonList(createPhysicalSlotRequest()));
+
+ addOneSlot();
+
+ assertThat(isFulfillable(bulk), is(true));
+ }
+
+ @Test
+ public void testBulkUnfulfillableWithInsufficientSlots() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+ addOneSlot();
+
+ assertThat(isFulfillable(bulk), is(false));
+ }
+
+ @Test
+ public void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
+ final PhysicalSlotRequest request = createPhysicalSlotRequest();
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Arrays.asList(request, createPhysicalSlotRequest()));
+
+ final PhysicalSlot slot = addOneSlot();
+
+ bulk.markRequestFulfilled(request.getSlotRequestId(), slot.getAllocationId());
+
+ assertThat(isFulfillable(bulk), is(false));
+ }
+
+ @Test
+ public void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+ final PhysicalSlot slot1 = addOneSlot();
+ addOneSlot();
+
+ occupySlot(slot1, true);
+
+ assertThat(isFulfillable(bulk), is(false));
+ }
+
+ @Test
+ public void testBulkFulfillableWithSlotOccupiedTemporarily() {
+ final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+ Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+ final PhysicalSlot slot1 = addOneSlot();
+ addOneSlot();
+
+ occupySlot(slot1, false);
+
+ assertThat(isFulfillable(bulk), is(true));
+ }
+
+ private static PhysicalSlotRequest createPhysicalSlotRequest() {
+ return new PhysicalSlotRequest(
+ new SlotRequestId(),
+ SlotProfile.noLocality(ResourceProfile.UNKNOWN),
+ true);
+ }
+
+ private static void occupySlot(final PhysicalSlot slotToOccupy, final boolean slotWillBeOccupiedIndefinitely) {
+ SingleLogicalSlot.allocateFromPhysicalSlot(
+ new SlotRequestId(),
+ slotToOccupy,
+ Locality.UNKNOWN,
+ new TestingSlotOwner(),
+ slotWillBeOccupiedIndefinitely);
+ }
+
+ private PhysicalSlot addOneSlot() {
+ final PhysicalSlot slot = new AllocatedSlot(
+ new AllocationID(),
+ new LocalTaskManagerLocation(),
+ 0,
+ ResourceProfile.ANY,
+ new SimpleAckingTaskManagerGateway());
+ slots.add(slot);
+
+ return slot;
+ }
+
+ private PhysicalSlotRequestBulkChecker.TimeoutCheckResult checkBulkTimeout(final PhysicalSlotRequestBulk bulk) {
+ return bulkChecker.checkPhysicalSlotRequestBulkTimeout(bulk, TIMEOUT);
+ }
+
+ private boolean isFulfillable(final PhysicalSlotRequestBulk bulk) {
+ return PhysicalSlotRequestBulkChecker.isSlotRequestBulkFulfillable(bulk, slotsRetriever);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java
new file mode 100644
index 0000000..cf6c556
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulk}.
+ */
+public class PhysicalSlotRequestBulkTest extends TestLogger {
+
+ private final ManualClock clock = new ManualClock();
+
+ @Test
+ public void testMarkBulkUnfulfillable() {
+ final PhysicalSlotRequestBulk bulk = new PhysicalSlotRequestBulk(Collections.emptyList());
+
+ clock.advanceTime(456, TimeUnit.MILLISECONDS);
+ bulk.markUnfulfillable(clock.relativeTimeMillis());
+
+ assertThat(bulk.getUnfulfillableSince(), is(clock.relativeTimeMillis()));
+ }
+
+ @Test
+ public void testUnfulfillableTimestampWillNotBeOverriddenByFollowingUnfulfillableTimestamp() {
+ final PhysicalSlotRequestBulk bulk = new PhysicalSlotRequestBulk(Collections.emptyList());
+
+ final long unfulfillableSince = clock.relativeTimeMillis();
+ bulk.markUnfulfillable(unfulfillableSince);
+
+ clock.advanceTime(456, TimeUnit.MILLISECONDS);
+ bulk.markUnfulfillable(clock.relativeTimeMillis());
+
+ assertThat(bulk.getUnfulfillableSince(), is(unfulfillableSince));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
index 776b51e..5006c8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
@@ -65,6 +65,10 @@ public class TestingSlotPoolImpl extends SlotPoolImpl {
runAsync(this::checkBatchSlotTimeout);
}
+ boolean isBatchSlotRequestTimeoutCheckEnabled() {
+ return batchSlotRequestTimeoutCheckEnabled;
+ }
+
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotRequestId slotRequestId,