You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/11 15:06:08 UTC

[flink] branch master updated (8cb6ef6 -> 9aee0c0)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8cb6ef6  [FLINK-17829][docs][jdbc] Add documentation for the new JDBC connector
     new dca8848  [hotfix] Fix checkstyle violations in AllocatedSlot
     new 9c2e15a  [hotfix][runtime] Set root cause to pending request released exception
     new 7054f97  [FLINK-17017][runtime] Allow to set whether a physical slot payload will occupy the slot indefinitely
     new 636e36c  [FLINK-17017][runtime] Add SingleLogicalSlot#allocateFromPhysicalSlot(…) for physical slot assignment
     new ccc4d8e  [FLINK-17017][runtime] Enable to get whether a physical slot will be occupied indefinitely
     new f6c275e  [FLINK-17017][runtime] Enable to get allocated slots information of a slot pool
     new 541f28f  [FLINK-17017][runtime] Allow to disable batch slot request timeout check
     new 4c16a34  [FLINK-17017][runtime] Allow nullable timeout for streaming slot request in slot pool
     new 044def9  [FLINK-17017][runtime] Introduce BulkSlotProvider which allocates physical slots in bulks
     new 9aee0c0  [FLINK-17017][runtime] SchedulerImpl supports bulk slot allocation

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/jobmaster/SlotInfo.java   |   7 +
 .../runtime/jobmaster/slotpool/AllocatedSlot.java  |  23 ++-
 .../jobmaster/slotpool/BulkSlotProvider.java       |  60 ++++++
 .../jobmaster/slotpool/BulkSlotProviderImpl.java   | 213 ++++++++++++++++++++
 .../runtime/jobmaster/slotpool/PhysicalSlot.java   |   9 +-
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |  79 ++++++++
 .../slotpool/PhysicalSlotRequestBulk.java          |  79 ++++++++
 .../slotpool/PhysicalSlotRequestBulkChecker.java   | 148 ++++++++++++++
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  |  34 ++--
 .../jobmaster/slotpool/SingleLogicalSlot.java      |  46 ++++-
 .../slotpool/SlotInfoWithUtilization.java          |   5 +
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  18 +-
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   |  52 +++--
 .../runtime/jobmaster/slotpool/SlotProvider.java   |  38 +++-
 .../jobmaster/slotpool/SlotSharingManager.java     |   5 +
 .../flink/runtime/instance/SimpleSlotContext.java  |   5 +
 .../flink/runtime/jobmaster/JobMasterTest.java     |  15 +-
 .../slotpool/AllocatedSlotOccupationTest.java      |  77 ++++++++
 .../slotpool/BulkSlotProviderImplTest.java         | 214 +++++++++++++++++++++
 .../PhysicalSlotRequestBulkCheckerTest.java        | 209 ++++++++++++++++++++
 .../slotpool/PhysicalSlotRequestBulkTest.java      |  61 ++++++
 .../jobmaster/slotpool/TestingSlotPoolImpl.java    |   8 +-
 22 files changed, 1362 insertions(+), 43 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java


[flink] 06/10: [FLINK-17017][runtime] Enable to get allocated slots information of a slot pool

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6c275e26002abb42bfcc40ffbf6da54fbed4aa6
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri May 29 11:02:50 2020 +0800

    [FLINK-17017][runtime] Enable to get allocated slots information of a slot pool
---
 .../org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java    | 9 +++++++++
 .../apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java    | 5 +++--
 .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java   | 5 +++++
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 47046d5..484e810 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -136,6 +137,14 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
 	Collection<SlotInfoWithUtilization> getAvailableSlotsInformation();
 
 	/**
+	 * Returns a list of {@link SlotInfo} objects about all slots that are currently allocated in the slot
+	 * pool.
+	 *
+	 * @return a list of {@link SlotInfo} objects about all slots that are currently allocated in the slot pool.
+	 */
+	Collection<SlotInfo> getAllocatedSlotsInformation();
+
+	/**
 	 * Allocates the available slot with the given allocation id under the given request id. This method returns
 	 * {@code null} if no slot with the given allocation id is available.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index cd0024a..1f1bc66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -41,10 +41,10 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
@@ -166,7 +166,8 @@ public class SlotPoolImpl implements SlotPool {
 	//  Getters
 	// ------------------------------------------------------------------------
 
-	private Collection<SlotInfo> getAllocatedSlotsInformation() {
+	@Override
+	public Collection<SlotInfo> getAllocatedSlotsInformation() {
 		return allocatedSlots.listSlotInfo();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 5953260..724129d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -585,6 +585,11 @@ public class JobMasterTest extends TestLogger {
 		}
 
 		@Override
+		public Collection<SlotInfo> getAllocatedSlotsInformation() {
+			return Collections.emptyList();
+		}
+
+		@Override
 		public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) {
 			throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
 		}


[flink] 09/10: [FLINK-17017][runtime] Introduce BulkSlotProvider which allocates physical slots in bulks

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 044def9b50fe4249eaa3f1472981404425a11fd9
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jun 5 18:40:16 2020 +0800

    [FLINK-17017][runtime] Introduce BulkSlotProvider which allocates physical slots in bulks
---
 .../jobmaster/slotpool/BulkSlotProvider.java       |  60 ++++++
 .../jobmaster/slotpool/BulkSlotProviderImpl.java   | 213 ++++++++++++++++++++
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |  79 ++++++++
 .../slotpool/PhysicalSlotRequestBulk.java          |  79 ++++++++
 .../slotpool/PhysicalSlotRequestBulkChecker.java   | 148 ++++++++++++++
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |   4 +-
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   |   2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   5 +
 .../slotpool/BulkSlotProviderImplTest.java         | 214 +++++++++++++++++++++
 .../PhysicalSlotRequestBulkCheckerTest.java        | 209 ++++++++++++++++++++
 .../slotpool/PhysicalSlotRequestBulkTest.java      |  61 ++++++
 .../jobmaster/slotpool/TestingSlotPoolImpl.java    |   4 +
 12 files changed, 1074 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
new file mode 100644
index 0000000..cbcedf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The bulk slot provider serves physical slot requests.
+ */
+public interface BulkSlotProvider {
+
+	/**
+	 * Starts the slot provider by initializing the main thread executor.
+	 *
+	 * @param mainThreadExecutor the main thread executor of the job master
+	 */
+	void start(ComponentMainThreadExecutor mainThreadExecutor);
+
+	/**
+	 * Allocates a bulk of physical slots. The allocation will be completed
+	 * normally only when all the requests are fulfilled.
+	 *
+	 * @param physicalSlotRequests requests for physical slots
+	 * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable
+	 * @return future of the results of slot requests
+	 */
+	CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+		Collection<PhysicalSlotRequest> physicalSlotRequests,
+		Time timeout);
+
+	/**
+	 * Cancels the slot request with the given {@link SlotRequestId}. If the request is already fulfilled
+	 * with a physical slot, the slot will be released.
+	 *
+	 * @param slotRequestId identifying the slot request to cancel
+	 * @param cause of the cancellation
+	 */
+	void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
new file mode 100644
index 0000000..b380999
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link BulkSlotProvider}.
+ */
+class BulkSlotProviderImpl implements BulkSlotProvider {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BulkSlotProviderImpl.class);
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	private final SlotSelectionStrategy slotSelectionStrategy;
+
+	private final SlotPool slotPool;
+
+	private final PhysicalSlotRequestBulkChecker slotRequestBulkChecker;
+
+	BulkSlotProviderImpl(final SlotSelectionStrategy slotSelectionStrategy, final SlotPool slotPool) {
+		this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
+		this.slotPool = checkNotNull(slotPool);
+
+		this.slotRequestBulkChecker = new PhysicalSlotRequestBulkChecker(
+			this::getAllSlotInfos,
+			SystemClock.getInstance());
+
+		this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
+			"Scheduler is not initialized with proper main thread executor. " +
+				"Call to BulkSlotProvider.start(...) required.");
+	}
+
+	@Override
+	public void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+		this.componentMainThreadExecutor = mainThreadExecutor;
+
+		slotPool.disableBatchSlotRequestTimeoutCheck();
+	}
+
+	@Override
+	public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) {
+		componentMainThreadExecutor.assertRunningInMainThread();
+
+		slotPool.releaseSlot(slotRequestId, cause);
+	}
+
+	@Override
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+			final Collection<PhysicalSlotRequest> physicalSlotRequests,
+			final Time timeout) {
+
+		componentMainThreadExecutor.assertRunningInMainThread();
+
+		LOG.debug("Received {} slot requests.", physicalSlotRequests.size());
+
+		final PhysicalSlotRequestBulk slotRequestBulk =
+			slotRequestBulkChecker.createPhysicalSlotRequestBulk(physicalSlotRequests);
+
+		final List<CompletableFuture<PhysicalSlotRequest.Result>> resultFutures = new ArrayList<>(physicalSlotRequests.size());
+		for (PhysicalSlotRequest request : physicalSlotRequests) {
+			final CompletableFuture<PhysicalSlotRequest.Result> resultFuture =
+				allocatePhysicalSlot(request).thenApply(result -> {
+					slotRequestBulk.markRequestFulfilled(
+						result.getSlotRequestId(),
+						result.getPhysicalSlot().getAllocationId());
+
+					return result;
+				});
+			resultFutures.add(resultFuture);
+		}
+
+		schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+
+		return FutureUtils.combineAll(resultFutures);
+	}
+
+	private CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
+			final PhysicalSlotRequest physicalSlotRequest) {
+
+		final SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
+		final SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
+		final ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile();
+
+		LOG.debug("Received slot request [{}] with resource requirements: {}", slotRequestId, resourceProfile);
+
+		final Optional<PhysicalSlot> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);
+
+		final CompletableFuture<PhysicalSlot> slotFuture;
+		if (availablePhysicalSlot.isPresent()) {
+			slotFuture = CompletableFuture.completedFuture(availablePhysicalSlot.get());
+		} else {
+			slotFuture = requestNewSlot(
+				slotRequestId,
+				resourceProfile,
+				physicalSlotRequest.willSlotBeOccupiedIndefinitely());
+		}
+
+		return slotFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
+	}
+
+	private Optional<PhysicalSlot> tryAllocateFromAvailable(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile) {
+
+		final Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
+			slotPool.getAvailableSlotsInformation()
+				.stream()
+				.map(SlotSelectionStrategy.SlotInfoAndResources::fromSingleSlot)
+				.collect(Collectors.toList());
+
+		final Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
+			slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
+
+		return selectedAvailableSlot.flatMap(
+			slotInfoAndLocality -> slotPool.allocateAvailableSlot(
+				slotRequestId,
+				slotInfoAndLocality.getSlotInfo().getAllocationId())
+		);
+	}
+
+	private CompletableFuture<PhysicalSlot> requestNewSlot(
+			final SlotRequestId slotRequestId,
+			final ResourceProfile resourceProfile,
+			final boolean willSlotBeOccupiedIndefinitely) {
+
+		if (willSlotBeOccupiedIndefinitely) {
+			return slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null);
+		} else {
+			return slotPool.requestNewAllocatedBatchSlot(slotRequestId, resourceProfile);
+		}
+	}
+
+	private void schedulePendingRequestBulkTimeoutCheck(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Time timeout) {
+
+		componentMainThreadExecutor.schedule(() -> {
+			final PhysicalSlotRequestBulkChecker.TimeoutCheckResult result =
+				slotRequestBulkChecker.checkPhysicalSlotRequestBulkTimeout(slotRequestBulk, timeout);
+
+			switch (result) {
+				case PENDING:
+					//re-schedule the timeout check
+					schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
+					break;
+				case TIMEOUT:
+					timeoutSlotRequestBulk(slotRequestBulk);
+					break;
+				default: // no action to take
+			}
+		}, timeout.getSize(), timeout.getUnit());
+	}
+
+	private void timeoutSlotRequestBulk(final PhysicalSlotRequestBulk slotRequestBulk) {
+		final Exception cause = new TimeoutException("Slot request bulk is not fulfillable!");
+		// pending requests must be canceled first otherwise they might be fulfilled by
+		// allocated slots released from this bulk
+		for (SlotRequestId slotRequestId : slotRequestBulk.getPendingRequests().keySet()) {
+			cancelSlotRequest(slotRequestId, cause);
+		}
+		for (SlotRequestId slotRequestId : slotRequestBulk.getFulfilledRequests().keySet()) {
+			cancelSlotRequest(slotRequestId, cause);
+		}
+	}
+
+	private Set<SlotInfo> getAllSlotInfos() {
+		return Stream
+			.concat(
+				slotPool.getAvailableSlotsInformation().stream(),
+				slotPool.getAllocatedSlotsInformation().stream())
+			.collect(Collectors.toSet());
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
new file mode 100644
index 0000000..b953e43
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+/**
+ * Represents a request for a physical slot.
+ */
+public class PhysicalSlotRequest {
+
+	private final SlotRequestId slotRequestId;
+
+	private final SlotProfile slotProfile;
+
+	private final boolean slotWillBeOccupiedIndefinitely;
+
+	public PhysicalSlotRequest(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile,
+			final boolean slotWillBeOccupiedIndefinitely) {
+
+		this.slotRequestId = slotRequestId;
+		this.slotProfile = slotProfile;
+		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+	}
+
+	SlotRequestId getSlotRequestId() {
+		return slotRequestId;
+	}
+
+	SlotProfile getSlotProfile() {
+		return slotProfile;
+	}
+
+	boolean willSlotBeOccupiedIndefinitely() {
+		return slotWillBeOccupiedIndefinitely;
+	}
+
+	/**
+	 * Result of a {@link PhysicalSlotRequest}.
+	 */
+	public static class Result {
+
+		private final SlotRequestId slotRequestId;
+
+		private final PhysicalSlot physicalSlot;
+
+		Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
+			this.slotRequestId = slotRequestId;
+			this.physicalSlot = physicalSlot;
+		}
+
+		public SlotRequestId getSlotRequestId() {
+			return slotRequestId;
+		}
+
+		public PhysicalSlot getPhysicalSlot() {
+			return physicalSlot;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
new file mode 100644
index 0000000..0d63a3b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a bulk of physical slot requests.
+ */
+class PhysicalSlotRequestBulk {
+
+	private final Map<SlotRequestId, ResourceProfile> pendingRequests;
+
+	private final Map<SlotRequestId, AllocationID> fulfilledRequests = new HashMap<>();
+
+	private long unfulfillableTimestamp = Long.MAX_VALUE;
+
+	PhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
+		this.pendingRequests = physicalSlotRequests.stream()
+			.collect(Collectors.toMap(
+				PhysicalSlotRequest::getSlotRequestId,
+				r -> r.getSlotProfile().getPhysicalSlotResourceProfile()));
+	}
+
+	void markRequestFulfilled(final SlotRequestId slotRequestId, final AllocationID allocationID) {
+		pendingRequests.remove(slotRequestId);
+		fulfilledRequests.put(slotRequestId, allocationID);
+	}
+
+	Map<SlotRequestId, ResourceProfile> getPendingRequests() {
+		return Collections.unmodifiableMap(pendingRequests);
+	}
+
+	Map<SlotRequestId, AllocationID> getFulfilledRequests() {
+		return Collections.unmodifiableMap(fulfilledRequests);
+	}
+
+	void markFulfillable() {
+		unfulfillableTimestamp = Long.MAX_VALUE;
+	}
+
+	void markUnfulfillable(final long currentTimestamp) {
+		if (isFulfillable()) {
+			unfulfillableTimestamp = currentTimestamp;
+		}
+	}
+
+	long getUnfulfillableSince() {
+		return unfulfillableTimestamp;
+	}
+
+	private boolean isFulfillable() {
+		return unfulfillableTimestamp == Long.MAX_VALUE;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
new file mode 100644
index 0000000..98ac706
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class helps to check the status of physical slot request bulks.
+ */
+class PhysicalSlotRequestBulkChecker {
+
+	private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+	private final Clock clock;
+
+	PhysicalSlotRequestBulkChecker(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) {
+		this.slotsRetriever = checkNotNull(slotsRetriever);
+		this.clock = checkNotNull(clock);
+	}
+
+	PhysicalSlotRequestBulk createPhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
+		final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
+		slotRequestBulk.markUnfulfillable(clock.relativeTimeMillis());
+
+		return slotRequestBulk;
+	}
+
+	/**
+	 * Check the slot request bulk and timeout its requests if it has been unfulfillable for too long.
+	 * @param slotRequestBulk bulk of slot requests
+	 * @param slotRequestTimeout indicates how long a pending request can be unfulfillable
+	 * @return result of the check, indicating the bulk is fulfilled, still pending, or timed out
+	 */
+	TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Time slotRequestTimeout) {
+
+		if (slotRequestBulk.getPendingRequests().isEmpty()) {
+			return TimeoutCheckResult.FULFILLED;
+		}
+
+		final boolean fulfillable = isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+		if (fulfillable) {
+			slotRequestBulk.markFulfillable();
+		} else {
+			final long currentTimestamp = clock.relativeTimeMillis();
+
+			slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+			final long unfulfillableSince = slotRequestBulk.getUnfulfillableSince();
+			if (unfulfillableSince + slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+				return TimeoutCheckResult.TIMEOUT;
+			}
+		}
+
+		return TimeoutCheckResult.PENDING;
+	}
+
+	/**
+	 * Returns whether the given bulk of slot requests are possible to be fulfilled at the same time
+	 * with all the reusable slots in the slot pool. A reusable slot means the slot is available or
+	 * will not be occupied indefinitely.
+	 *
+	 * @param slotRequestBulk bulk of slot requests to check
+	 * @param slotsRetriever supplies slots to be used for the fulfill-ability check
+	 * @return true if the slot requests are possible to be fulfilled, otherwise false
+	 */
+	@VisibleForTesting
+	static boolean isSlotRequestBulkFulfillable(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+		final Set<AllocationID> assignedSlots = new HashSet<>(slotRequestBulk.getFulfilledRequests().values());
+		final Set<SlotInfo> reusableSlots = getReusableSlots(slotsRetriever, assignedSlots);
+		return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(), reusableSlots);
+	}
+
+	private static Set<SlotInfo> getReusableSlots(
+			final Supplier<Set<SlotInfo>> slotsRetriever,
+			final Set<AllocationID> slotsToExclude) {
+
+		return slotsRetriever.get().stream()
+			.filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely())
+			.filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId()))
+			.collect(Collectors.toSet());
+	}
+
+	private static boolean areRequestsFulfillableWithSlots(
+			final Collection<ResourceProfile> requestResourceProfiles,
+			final Set<SlotInfo> slots) {
+
+		final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+		for (ResourceProfile requestResourceProfile : requestResourceProfiles) {
+			final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+			if (matchedSlot.isPresent()) {
+				remainingSlots.remove(matchedSlot.get());
+			} else {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static Optional<SlotInfo> findMatchingSlotForRequest(
+			final ResourceProfile requestResourceProfile,
+			final Collection<SlotInfo> slots) {
+
+		return slots.stream().filter(slot -> slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+	}
+
+	enum TimeoutCheckResult {
+		PENDING,
+
+		FULFILLED,
+
+		TIMEOUT
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 4392e70..89932c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -191,9 +191,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
 	 * Disables batch slot request timeout check. Invoked when someone else wants to
 	 * take over the timeout check responsibility.
 	 */
-	default void disableBatchSlotRequestTimeoutCheck() {
-		throw new UnsupportedOperationException("Not properly implemented.");
-	}
+	void disableBatchSlotRequestTimeoutCheck();
 
 	/**
 	 * Create report about the allocated slots belonging to the specified task manager.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index d9407f5..d305585 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -134,7 +134,7 @@ public class SlotPoolImpl implements SlotPool {
 
 	private ComponentMainThreadExecutor componentMainThreadExecutor;
 
-	private boolean batchSlotRequestTimeoutCheckEnabled;
+	protected boolean batchSlotRequestTimeoutCheckEnabled;
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 24f0f20..ce1716b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -610,6 +610,11 @@ public class JobMasterTest extends TestLogger {
 		}
 
 		@Override
+		public void disableBatchSlotRequestTimeoutCheck() {
+			// no action and no exception is expected
+		}
+
+		@Override
 		public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
 			final Collection<SlotInfo> slotInfos = registeredSlots.getOrDefault(taskManagerId, Collections.emptyList());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
new file mode 100644
index 0000000..8d4d0ad
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImplTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BulkSlotProviderImpl}.
+ */
+public class BulkSlotProviderImplTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.milliseconds(1000L);
+
+	private static ScheduledExecutorService singleThreadScheduledExecutorService;
+
+	private static ComponentMainThreadExecutor mainThreadExecutor;
+
+	private TestingSlotPoolImpl slotPool;
+
+	private BulkSlotProviderImpl bulkSlotProvider;
+
+	private ManualClock clock;
+
+	@BeforeClass
+	public static void setupClass() {
+		singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+		mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
+	}
+
+	@AfterClass
+	public static void teardownClass() {
+		if (singleThreadScheduledExecutorService != null) {
+			singleThreadScheduledExecutorService.shutdownNow();
+		}
+	}
+
+	@Before
+	public void setup() throws Exception {
+		clock = new ManualClock();
+
+		slotPool = new SlotPoolBuilder(mainThreadExecutor).build();
+
+		bulkSlotProvider = new BulkSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+		bulkSlotProvider.start(mainThreadExecutor);
+	}
+
+	@After
+	public void teardown() {
+		CompletableFuture.runAsync(() -> slotPool.close(), mainThreadExecutor).join();
+	}
+
+	@Test
+	public void testBulkSlotAllocationFulfilledWithAvailableSlots() throws Exception {
+		final PhysicalSlotRequest request1 = createPhysicalSlotRequest();
+		final PhysicalSlotRequest request2 = createPhysicalSlotRequest();
+		final List<PhysicalSlotRequest> requests = Arrays.asList(request1, request2);
+
+		addSlotToSlotPool();
+		addSlotToSlotPool();
+
+		final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+		final Collection<PhysicalSlotRequest.Result> results = slotFutures.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+		final Collection<SlotRequestId> resultRequestIds = results.stream()
+			.map(PhysicalSlotRequest.Result::getSlotRequestId)
+			.collect(Collectors.toList());
+
+		assertThat(resultRequestIds, containsInAnyOrder(request1.getSlotRequestId(), request2.getSlotRequestId()));
+	}
+
+	@Test
+	public void testBulkSlotAllocationFulfilledWithNewSlots() {
+		final List<PhysicalSlotRequest> requests = Arrays.asList(
+			createPhysicalSlotRequest(),
+			createPhysicalSlotRequest());
+		final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+		addSlotToSlotPool();
+
+		assertThat(slotFutures.isDone(), is(false));
+
+		addSlotToSlotPool();
+
+		assertThat(slotFutures.isDone(), is(true));
+		assertThat(slotFutures.isCompletedExceptionally(), is(false));
+	}
+
+	@Test
+	public void testBulkSlotAllocationTimeoutsIfUnfulfillable() {
+		final Exception exception = allocateSlotsAndWaitForTimeout();
+
+		final Optional<Throwable> cause = ExceptionUtils.findThrowableWithMessage(
+			exception,
+			"Slot request bulk is not fulfillable!");
+		assertThat(cause.isPresent(), is(true));
+		assertThat(cause.get(), instanceOf(TimeoutException.class));
+	}
+
+	@Test
+	public void testFailedBulkSlotAllocationReleasesAllocatedSlot() {
+		allocateSlotsAndWaitForTimeout();
+
+		assertThat(slotPool.getAllocatedSlots().listSlotInfo(), hasSize(0));
+	}
+
+	@Test
+	public void testFailedBulkSlotAllocationClearsPendingRequests() {
+		allocateSlotsAndWaitForTimeout();
+
+		assertThat(slotPool.getPendingRequests().values(), hasSize(0));
+	}
+
+	private Exception allocateSlotsAndWaitForTimeout() {
+		final List<PhysicalSlotRequest> requests = Arrays.asList(
+			createPhysicalSlotRequest(),
+			createPhysicalSlotRequest());
+		final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);
+
+		addSlotToSlotPool();
+
+		assertThat(slotPool.getAllocatedSlots().listSlotInfo(), hasSize(1));
+
+		clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
+
+		try {
+			// wait util the requests timed out
+			slotFutures.get();
+
+			fail("Expected that the slot futures time out.");
+			return new Exception("Unexpected");
+		} catch (Exception e) {
+			// expected
+			return e;
+		}
+	}
+
+	@Test
+	public void testIndividualBatchSlotRequestTimeoutCheckIsDisabled() {
+		assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
+	}
+
+	private void addSlotToSlotPool() {
+		SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY));
+	}
+
+	private CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocateSlots(
+			final Collection<PhysicalSlotRequest> requests) {
+
+		return CompletableFuture
+			.supplyAsync(
+				() -> bulkSlotProvider.allocatePhysicalSlots(
+					requests,
+					TIMEOUT),
+				mainThreadExecutor)
+			.thenCompose(Function.identity());
+	}
+
+	private static PhysicalSlotRequest createPhysicalSlotRequest() {
+		return new PhysicalSlotRequest(
+			new SlotRequestId(),
+			SlotProfile.noLocality(ResourceProfile.UNKNOWN),
+			true);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
new file mode 100644
index 0000000..557ba54
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkChecker}.
+ */
+public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.milliseconds(5000L);
+
+	private final ManualClock clock = new ManualClock();
+
+	private PhysicalSlotRequestBulkChecker bulkChecker;
+
+	private Set<PhysicalSlot> slots;
+
+	private Supplier<Set<SlotInfo>> slotsRetriever;
+
+	@Before
+	public void setup() throws Exception {
+		slots = new HashSet<>();
+		slotsRetriever = () -> new HashSet<>(slots);
+		bulkChecker = new PhysicalSlotRequestBulkChecker(slotsRetriever, clock);
+	}
+
+	@Test
+	public void testCreateBulk() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(Collections.emptyList());
+
+		assertThat(bulk.getUnfulfillableSince(), is(clock.relativeTimeMillis()));
+	}
+
+	@Test
+	public void testBulkFulfilledOnCheck() {
+		final PhysicalSlotRequest request = createPhysicalSlotRequest();
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(request));
+
+		bulk.markRequestFulfilled(request.getSlotRequestId(), new AllocationID());
+
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.FULFILLED));
+	}
+
+	@Test
+	public void testBulkTimeoutOnCheck() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest()));
+
+		clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.TIMEOUT));
+	}
+
+	@Test
+	public void testBulkPendingOnCheckIfFulfillable() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot = addOneSlot();
+		occupySlot(slot, false);
+
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
+	}
+
+	@Test
+	public void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(createPhysicalSlotRequest()));
+
+		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
+	}
+
+	@Test
+	public void testBulkFulfillable() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Collections.singletonList(createPhysicalSlotRequest()));
+
+		addOneSlot();
+
+		assertThat(isFulfillable(bulk), is(true));
+	}
+
+	@Test
+	public void testBulkUnfulfillableWithInsufficientSlots() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+		addOneSlot();
+
+		assertThat(isFulfillable(bulk), is(false));
+	}
+
+	@Test
+	public void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
+		final PhysicalSlotRequest request = createPhysicalSlotRequest();
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(request, createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot = addOneSlot();
+
+		bulk.markRequestFulfilled(request.getSlotRequestId(), slot.getAllocationId());
+
+		assertThat(isFulfillable(bulk), is(false));
+	}
+
+	@Test
+	public void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot1 = addOneSlot();
+		addOneSlot();
+
+		occupySlot(slot1, true);
+
+		assertThat(isFulfillable(bulk), is(false));
+	}
+
+	@Test
+	public void testBulkFulfillableWithSlotOccupiedTemporarily() {
+		final PhysicalSlotRequestBulk bulk = bulkChecker.createPhysicalSlotRequestBulk(
+			Arrays.asList(createPhysicalSlotRequest(), createPhysicalSlotRequest()));
+
+		final PhysicalSlot slot1 = addOneSlot();
+		addOneSlot();
+
+		occupySlot(slot1, false);
+
+		assertThat(isFulfillable(bulk), is(true));
+	}
+
+	private static PhysicalSlotRequest createPhysicalSlotRequest() {
+		return new PhysicalSlotRequest(
+			new SlotRequestId(),
+			SlotProfile.noLocality(ResourceProfile.UNKNOWN),
+			true);
+	}
+
+	private static void occupySlot(final PhysicalSlot slotToOccupy, final boolean slotWillBeOccupiedIndefinitely) {
+		SingleLogicalSlot.allocateFromPhysicalSlot(
+			new SlotRequestId(),
+			slotToOccupy,
+			Locality.UNKNOWN,
+			new TestingSlotOwner(),
+			slotWillBeOccupiedIndefinitely);
+	}
+
+	private PhysicalSlot addOneSlot() {
+		final PhysicalSlot slot = new AllocatedSlot(
+			new AllocationID(),
+			new LocalTaskManagerLocation(),
+			0,
+			ResourceProfile.ANY,
+			new SimpleAckingTaskManagerGateway());
+		slots.add(slot);
+
+		return slot;
+	}
+
+	private PhysicalSlotRequestBulkChecker.TimeoutCheckResult checkBulkTimeout(final PhysicalSlotRequestBulk bulk) {
+		return bulkChecker.checkPhysicalSlotRequestBulkTimeout(bulk, TIMEOUT);
+	}
+
+	private boolean isFulfillable(final PhysicalSlotRequestBulk bulk) {
+		return PhysicalSlotRequestBulkChecker.isSlotRequestBulkFulfillable(bulk, slotsRetriever);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java
new file mode 100644
index 0000000..cf6c556
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulk}.
+ */
+public class PhysicalSlotRequestBulkTest extends TestLogger {
+
+	private final ManualClock clock = new ManualClock();
+
+	@Test
+	public void testMarkBulkUnfulfillable() {
+		final PhysicalSlotRequestBulk bulk = new PhysicalSlotRequestBulk(Collections.emptyList());
+
+		clock.advanceTime(456, TimeUnit.MILLISECONDS);
+		bulk.markUnfulfillable(clock.relativeTimeMillis());
+
+		assertThat(bulk.getUnfulfillableSince(), is(clock.relativeTimeMillis()));
+	}
+
+	@Test
+	public void testUnfulfillableTimestampWillNotBeOverriddenByFollowingUnfulfillableTimestamp() {
+		final PhysicalSlotRequestBulk bulk = new PhysicalSlotRequestBulk(Collections.emptyList());
+
+		final long unfulfillableSince = clock.relativeTimeMillis();
+		bulk.markUnfulfillable(unfulfillableSince);
+
+		clock.advanceTime(456, TimeUnit.MILLISECONDS);
+		bulk.markUnfulfillable(clock.relativeTimeMillis());
+
+		assertThat(bulk.getUnfulfillableSince(), is(unfulfillableSince));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
index 776b51e..5006c8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
@@ -65,6 +65,10 @@ public class TestingSlotPoolImpl extends SlotPoolImpl {
 		runAsync(this::checkBatchSlotTimeout);
 	}
 
+	boolean isBatchSlotRequestTimeoutCheckEnabled() {
+		return batchSlotRequestTimeoutCheckEnabled;
+	}
+
 	@Override
 	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
 			final SlotRequestId slotRequestId,


[flink] 01/10: [hotfix] Fix checkstyle violations in AllocatedSlot

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dca88480bf7316c8a999dcf95b8c7d1a2805bed7
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon May 18 12:09:03 2020 +0800

    [hotfix] Fix checkstyle violations in AllocatedSlot
---
 .../runtime/jobmaster/slotpool/AllocatedSlot.java      | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
index 7463762..210ff3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -32,27 +32,27 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The {@code AllocatedSlot} represents a slot that the JobMaster allocated from a TaskExecutor.
  * It represents a slice of allocated resources from the TaskExecutor.
- * 
+ *
  * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
  * ResourceManager picks (or starts) a TaskExecutor that will then allocate the slot to the
  * JobMaster and notify the JobMaster.
- * 
+ *
  * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
  * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
- * JobManager. All slots had a default unknown resource profile. 
+ * JobManager. All slots had a default unknown resource profile.
  */
 class AllocatedSlot implements PhysicalSlot {
 
 	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
 	private final AllocationID allocationId;
 
-	/** The location information of the TaskManager to which this slot belongs */
+	/** The location information of the TaskManager to which this slot belongs. */
 	private final TaskManagerLocation taskManagerLocation;
 
-	/** The resource profile of the slot provides */
+	/** The resource profile of the slot provides. */
 	private final ResourceProfile resourceProfile;
 
-	/** RPC gateway to call the TaskManager that holds this slot */
+	/** RPC gateway to call the TaskManager that holds this slot. */
 	private final TaskManagerGateway taskManagerGateway;
 
 	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
@@ -93,9 +93,9 @@ class AllocatedSlot implements PhysicalSlot {
 
 	/**
 	 * Gets the ID of the TaskManager on which this slot was allocated.
-	 * 
-	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
-	 * 
+	 *
+	 * <p>This is equivalent to {@link #getTaskManagerLocation()}.{@link #getTaskManagerId()}.
+	 *
 	 * @return This slot's TaskManager's ID.
 	 */
 	public ResourceID getTaskManagerId() {


[flink] 08/10: [FLINK-17017][runtime] Allow nullable timeout for streaming slot request in slot pool

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c16a3468c65b876ac687a3805411aa3c7a9f295
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 4 15:46:45 2020 +0800

    [FLINK-17017][runtime] Allow nullable timeout for streaming slot request in slot pool
---
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |  3 ++-
 .../runtime/jobmaster/slotpool/SlotPoolImpl.java   | 30 ++++++++++++----------
 .../flink/runtime/jobmaster/JobMasterTest.java     |  5 +++-
 .../jobmaster/slotpool/TestingSlotPoolImpl.java    |  4 ++-
 4 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 8b4202c6..4392e70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.Optional;
@@ -170,7 +171,7 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
 	CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
 		@Nonnull SlotRequestId slotRequestId,
 		@Nonnull ResourceProfile resourceProfile,
-		Time timeout);
+		@Nullable Time timeout);
 
 	/**
 	 * Requests the allocation of a new batch slot from the resource manager. Unlike the normal slot, a batch
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 66d36ac..d9407f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -419,25 +419,27 @@ public class SlotPoolImpl implements SlotPool {
 	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
 			@Nonnull SlotRequestId slotRequestId,
 			@Nonnull ResourceProfile resourceProfile,
-			Time timeout) {
+			@Nullable Time timeout) {
 
 		componentMainThreadExecutor.assertRunningInMainThread();
 
 		final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
 
-		// register request timeout
-		FutureUtils
-			.orTimeout(
-				pendingRequest.getAllocatedSlotFuture(),
-				timeout.toMilliseconds(),
-				TimeUnit.MILLISECONDS,
-				componentMainThreadExecutor)
-			.whenComplete(
-				(AllocatedSlot ignored, Throwable throwable) -> {
-					if (throwable instanceof TimeoutException) {
-						timeoutPendingSlotRequest(slotRequestId);
-					}
-				});
+		if (timeout != null) {
+			// register request timeout
+			FutureUtils
+				.orTimeout(
+					pendingRequest.getAllocatedSlotFuture(),
+					timeout.toMilliseconds(),
+					TimeUnit.MILLISECONDS,
+					componentMainThreadExecutor)
+				.whenComplete(
+					(AllocatedSlot ignored, Throwable throwable) -> {
+						if (throwable instanceof TimeoutException) {
+							timeoutPendingSlotRequest(slotRequestId);
+						}
+					});
+		}
 
 		return requestNewAllocatedSlotInternal(pendingRequest)
 			.thenApply((Function.identity()));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 724129d..24f0f20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -596,7 +596,10 @@ public class JobMasterTest extends TestLogger {
 
 		@Nonnull
 		@Override
-		public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout) {
+		public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
+				@Nonnull SlotRequestId slotRequestId,
+				@Nonnull ResourceProfile resourceProfile,
+				@Nullable Time timeout) {
 			return new CompletableFuture<>();
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
index c007def1..776b51e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolImpl.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -67,7 +69,7 @@ public class TestingSlotPoolImpl extends SlotPoolImpl {
 	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
 			final SlotRequestId slotRequestId,
 			final ResourceProfile resourceProfile,
-			final Time timeout) {
+			@Nullable final Time timeout) {
 
 		this.lastRequestedSlotResourceProfile = resourceProfile;
 


[flink] 10/10: [FLINK-17017][runtime] SchedulerImpl supports bulk slot allocation

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9aee0c0fb2121207bfdd7f824b61ff079b8ff884
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jun 5 18:42:43 2020 +0800

    [FLINK-17017][runtime] SchedulerImpl supports bulk slot allocation
---
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  | 14 ++++++++
 .../runtime/jobmaster/slotpool/SlotProvider.java   | 38 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index f7a518f..af87e9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -77,6 +77,8 @@ public class SchedulerImpl implements Scheduler {
 	@Nonnull
 	private final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
 
+	private final BulkSlotProvider bulkSlotProvider;
+
 	public SchedulerImpl(
 		@Nonnull SlotSelectionStrategy slotSelectionStrategy,
 		@Nonnull SlotPool slotPool) {
@@ -95,11 +97,15 @@ public class SchedulerImpl implements Scheduler {
 		this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
 			"Scheduler is not initialized with proper main thread executor. " +
 				"Call to Scheduler.start(...) required.");
+
+		this.bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
 	}
 
 	@Override
 	public void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor) {
 		this.componentMainThreadExecutor = mainThreadExecutor;
+
+		bulkSlotProvider.start(mainThreadExecutor);
 	}
 
 	//---------------------------
@@ -560,4 +566,12 @@ public class SchedulerImpl implements Scheduler {
 	public boolean requiresPreviousExecutionGraphAllocations() {
 		return slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy;
 	}
+
+	@Override
+	public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+			final Collection<PhysicalSlotRequest> physicalSlotRequests,
+			final Time timeout) {
+
+		return bulkSlotProvider.allocatePhysicalSlots(physicalSlotRequests, timeout);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
index 36da2c7..747a6b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -40,7 +42,16 @@ import java.util.concurrent.CompletableFuture;
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
  */
-public interface SlotProvider {
+public interface SlotProvider extends BulkSlotProvider {
+
+	/**
+	 * Starts the slot provider by initializing the main thread executor.
+	 *
+	 * @param mainThreadExecutor the main thread executor of the job master
+	 */
+	default void start(ComponentMainThreadExecutor mainThreadExecutor) {
+		throw new UnsupportedOperationException("Not properly implemented.");
+	}
 
 	/**
 	 * Allocating slot with specific requirement.
@@ -92,6 +103,20 @@ public interface SlotProvider {
 	}
 
 	/**
+	 * Allocates a bulk of physical slots. The allocation will be completed
+	 * normally only when all the requests are fulfilled.
+	 *
+	 * @param physicalSlotRequests requests for physical slots
+	 * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable
+	 * @return future of the results of slot requests
+	 */
+	default CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+		Collection<PhysicalSlotRequest> physicalSlotRequests,
+		Time timeout) {
+		throw new UnsupportedOperationException("Not properly implemented.");
+	}
+
+	/**
 	 * Cancels the slot request with the given {@link SlotRequestId} and {@link SlotSharingGroupId}.
 	 *
 	 * @param slotRequestId identifying the slot request to cancel
@@ -102,4 +127,15 @@ public interface SlotProvider {
 		SlotRequestId slotRequestId,
 		@Nullable SlotSharingGroupId slotSharingGroupId,
 		Throwable cause);
+
+	/**
+	 * Cancels the slot request with the given {@link SlotRequestId}. If the request is already fulfilled
+	 * with a physical slot, the slot will be released.
+	 *
+	 * @param slotRequestId identifying the slot request to cancel
+	 * @param cause of the cancellation
+	 */
+	default void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) {
+		cancelSlotRequest(slotRequestId, null, cause);
+	}
 }


[flink] 03/10: [FLINK-17017][runtime] Allow to set whether a physical slot payload will occupy the slot indefinitely

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7054f97350efec83e3bf8b10b9b2c44f1a149cf1
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon May 18 12:40:53 2020 +0800

    [FLINK-17017][runtime] Allow to set whether a physical slot payload will occupy the slot indefinitely
---
 .../runtime/jobmaster/slotpool/PhysicalSlot.java   |  9 +++++++-
 .../jobmaster/slotpool/SingleLogicalSlot.java      | 24 +++++++++++++++++++++-
 .../jobmaster/slotpool/SlotSharingManager.java     |  5 +++++
 3 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
index 54c2d8a..9c91801 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlot.java
@@ -41,10 +41,17 @@ public interface PhysicalSlot extends SlotContext {
 	interface Payload {
 
 		/**
-		 * Releases the payload
+		 * Releases the payload.
 		 *
 		 * @param cause of the payload release
 		 */
 		void release(Throwable cause);
+
+		/**
+		 * Returns whether the payload will occupy a physical slot indefinitely.
+		 *
+		 * @return true if the payload will occupy a physical slot indefinitely, otherwise false
+		 */
+		boolean willOccupySlotIndefinitely();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 707bba9..e98b57e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -70,17 +71,33 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 	// LogicalSlot.Payload of this slot
 	private volatile Payload payload;
 
+	/** Whether this logical slot will be occupied indefinitely. */
+	private boolean willBeOccupiedIndefinitely;
+
+	@VisibleForTesting
+	public SingleLogicalSlot(
+		SlotRequestId slotRequestId,
+		SlotContext slotContext,
+		@Nullable SlotSharingGroupId slotSharingGroupId,
+		Locality locality,
+		SlotOwner slotOwner) {
+
+		this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true);
+	}
+
 	public SingleLogicalSlot(
 			SlotRequestId slotRequestId,
 			SlotContext slotContext,
 			@Nullable SlotSharingGroupId slotSharingGroupId,
 			Locality locality,
-			SlotOwner slotOwner) {
+			SlotOwner slotOwner,
+			boolean willBeOccupiedIndefinitely) {
 		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 		this.slotContext = Preconditions.checkNotNull(slotContext);
 		this.slotSharingGroupId = slotSharingGroupId;
 		this.locality = Preconditions.checkNotNull(locality);
 		this.slotOwner = Preconditions.checkNotNull(slotOwner);
+		this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely;
 		this.releaseFuture = new CompletableFuture<>();
 
 		this.state = State.ALIVE;
@@ -168,6 +185,11 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 		releaseFuture.complete(null);
 	}
 
+	@Override
+	public boolean willOccupySlotIndefinitely() {
+		return willBeOccupiedIndefinitely;
+	}
+
 	private void signalPayloadRelease(Throwable cause) {
 		tryAssignPayload(TERMINATED_PAYLOAD);
 		payload.fail(cause);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index bd0675a..28b0cb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -574,6 +574,11 @@ public class SlotSharingManager {
 		}
 
 		@Override
+		public boolean willOccupySlotIndefinitely() {
+			throw new UnsupportedOperationException("Shared slot are not allowed for slot occupation check.");
+		}
+
+		@Override
 		public ResourceProfile getReservedResources() {
 			return reservedResources;
 		}


[flink] 02/10: [hotfix][runtime] Set root cause to pending request released exception

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c2e15aab6a69084305100fd353cc295ff858206
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Wed May 20 01:01:33 2020 +0800

    [hotfix][runtime] Set root cause to pending request released exception
---
 .../org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 44ba3ff..cd0024a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -473,7 +473,9 @@ public class SlotPoolImpl implements SlotPool {
 		final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
 
 		if (pendingRequest != null) {
-			failPendingRequest(pendingRequest, new FlinkException("Pending slot request with " + slotRequestId + " has been released."));
+			failPendingRequest(
+				pendingRequest,
+				new FlinkException("Pending slot request with " + slotRequestId + " has been released.", cause));
 		} else {
 			final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
 


[flink] 04/10: [FLINK-17017][runtime] Add SingleLogicalSlot#allocateFromPhysicalSlot(…) for physical slot assignment

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 636e36cbd5bc2f45e0e502e0fec9d7723b06dd7a
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 4 15:59:57 2020 +0800

    [FLINK-17017][runtime] Add SingleLogicalSlot#allocateFromPhysicalSlot(…) for physical slot assignment
---
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  | 20 +++++++++-----------
 .../jobmaster/slotpool/SingleLogicalSlot.java      | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index 91e6631..f7a518f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -244,18 +244,16 @@ public class SchedulerImpl implements Scheduler {
 
 		final PhysicalSlot allocatedSlot = slotAndLocality.getSlot();
 
-		final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
-			slotRequestId,
-			allocatedSlot,
-			null,
-			slotAndLocality.getLocality(),
-			this);
-
-		if (allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+		try {
+			final SingleLogicalSlot singleTaskSlot = SingleLogicalSlot.allocateFromPhysicalSlot(
+				slotRequestId,
+				allocatedSlot,
+				slotAndLocality.getLocality(),
+				this,
+				true);
 			return singleTaskSlot;
-		} else {
-			final FlinkException flinkException =
-				new FlinkException("Could not assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.');
+		} catch (Throwable t) {
+			final FlinkException flinkException = new FlinkException(t);
 			slotPool.releaseSlot(slotRequestId, flinkException);
 			throw flinkException;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index e98b57e..710f003 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -166,6 +166,28 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 		return slotSharingGroupId;
 	}
 
+	static SingleLogicalSlot allocateFromPhysicalSlot(
+			final SlotRequestId slotRequestId,
+			final PhysicalSlot physicalSlot,
+			final Locality locality,
+			final SlotOwner slotOwner,
+			final boolean slotWillBeOccupiedIndefinitely) {
+
+		final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
+			slotRequestId,
+			physicalSlot,
+			null,
+			locality,
+			slotOwner,
+			slotWillBeOccupiedIndefinitely);
+
+		if (physicalSlot.tryAssignPayload(singleTaskSlot)) {
+			return singleTaskSlot;
+		} else {
+			throw new IllegalStateException("BUG: Unexpected physical slot payload assignment failure!");
+		}
+	}
+
 	// -------------------------------------------------------------------------
 	// AllocatedSlot.Payload implementation
 	// -------------------------------------------------------------------------


[flink] 07/10: [FLINK-17017][runtime] Allow to disable batch slot request timeout check

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 541f28fe4e0f90bde479fa457841e3cbc610f2ef
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 4 15:33:27 2020 +0800

    [FLINK-17017][runtime] Allow to disable batch slot request timeout check
---
 .../apache/flink/runtime/jobmaster/slotpool/SlotPool.java   |  8 ++++++++
 .../flink/runtime/jobmaster/slotpool/SlotPoolImpl.java      | 13 +++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 484e810..8b4202c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -187,6 +187,14 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable {
 		@Nonnull ResourceProfile resourceProfile);
 
 	/**
+	 * Disables batch slot request timeout check. Invoked when someone else wants to
+	 * take over the timeout check responsibility.
+	 */
+	default void disableBatchSlotRequestTimeoutCheck() {
+		throw new UnsupportedOperationException("Not properly implemented.");
+	}
+
+	/**
 	 * Create report about the allocated slots belonging to the specified task manager.
 	 *
 	 * @param taskManagerId identifies the task manager
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
index 1f1bc66..66d36ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
@@ -134,6 +134,8 @@ public class SlotPoolImpl implements SlotPool {
 
 	private ComponentMainThreadExecutor componentMainThreadExecutor;
 
+	private boolean batchSlotRequestTimeoutCheckEnabled;
+
 	// ------------------------------------------------------------------------
 
 	public SlotPoolImpl(
@@ -160,6 +162,8 @@ public class SlotPoolImpl implements SlotPool {
 		this.jobManagerAddress = null;
 
 		this.componentMainThreadExecutor = null;
+
+		this.batchSlotRequestTimeoutCheckEnabled = true;
 	}
 
 	// ------------------------------------------------------------------------
@@ -454,6 +458,11 @@ public class SlotPoolImpl implements SlotPool {
 	}
 
 	@Override
+	public void disableBatchSlotRequestTimeoutCheck() {
+		batchSlotRequestTimeoutCheckEnabled = false;
+	}
+
+	@Override
 	@Nonnull
 	public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
 		final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();
@@ -874,6 +883,10 @@ public class SlotPoolImpl implements SlotPool {
 	}
 
 	protected void checkBatchSlotTimeout() {
+		if (!batchSlotRequestTimeoutCheckEnabled) {
+			return;
+		}
+
 		final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();
 
 		if (!pendingBatchRequests.isEmpty()) {


[flink] 05/10: [FLINK-17017][runtime] Enable to get whether a physical slot will be occupied indefinitely

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ccc4d8e0d807ee2fff7d286cbecd904686aec44f
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 4 16:08:15 2020 +0800

    [FLINK-17017][runtime] Enable to get whether a physical slot will be occupied indefinitely
---
 .../apache/flink/runtime/jobmaster/SlotInfo.java   |  7 ++
 .../runtime/jobmaster/slotpool/AllocatedSlot.java  |  5 ++
 .../slotpool/SlotInfoWithUtilization.java          |  5 ++
 .../flink/runtime/instance/SimpleSlotContext.java  |  5 ++
 .../slotpool/AllocatedSlotOccupationTest.java      | 77 ++++++++++++++++++++++
 5 files changed, 99 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
index 09b49f5..5018c03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
@@ -55,4 +55,11 @@ public interface SlotInfo {
 	 * @return the resource profile of the slot.
 	 */
 	ResourceProfile getResourceProfile();
+
+	/**
+	 * Returns whether the slot will be occupied indefinitely.
+	 *
+	 * @return true if the slot will be occupied indefinitely, otherwise false.
+	 */
+	boolean willBeOccupiedIndefinitely();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
index 210ff3d..e3b3a17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -108,6 +108,11 @@ class AllocatedSlot implements PhysicalSlot {
 	}
 
 	@Override
+	public boolean willBeOccupiedIndefinitely() {
+		return isUsed() && payloadReference.get().willOccupySlotIndefinitely();
+	}
+
+	@Override
 	public TaskManagerLocation getTaskManagerLocation() {
 		return taskManagerLocation;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java
index 31b03e1..6a6ee7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java
@@ -59,6 +59,11 @@ public final class SlotInfoWithUtilization implements SlotInfo {
 		return slotInfoDelegate.getResourceProfile();
 	}
 
+	@Override
+	public boolean willBeOccupiedIndefinitely() {
+		return slotInfoDelegate.willBeOccupiedIndefinitely();
+	}
+
 	public static SlotInfoWithUtilization from(SlotInfo slotInfo, double taskExecutorUtilization) {
 		return new SlotInfoWithUtilization(slotInfo, taskExecutorUtilization);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
index 5a1887d..d5023b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
@@ -85,4 +85,9 @@ public class SimpleSlotContext implements SlotContext {
 	public ResourceProfile getResourceProfile() {
 		return resourceProfile;
 	}
+
+	@Override
+	public boolean willBeOccupiedIndefinitely() {
+		return true;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
new file mode 100644
index 0000000..f68ff75
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests whether the slot occupation state of {@link AllocatedSlot} is correctly.
+ */
+public class AllocatedSlotOccupationTest extends TestLogger {
+
+	@Test
+	public void testSingleTaskOccupyingSlotIndefinitely() {
+		final PhysicalSlot physicalSlot = createPhysicalSlot();
+		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, true);
+
+		assertThat(physicalSlot.willBeOccupiedIndefinitely(), is(true));
+	}
+
+	@Test
+	public void testSingleTaskNotOccupyingSlotIndefinitely() {
+		final PhysicalSlot physicalSlot = createPhysicalSlot();
+		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, false);
+
+		assertThat(physicalSlot.willBeOccupiedIndefinitely(), is(false));
+	}
+
+	private static PhysicalSlot createPhysicalSlot() {
+		return new AllocatedSlot(
+			new AllocationID(),
+			new LocalTaskManagerLocation(),
+			0,
+			ResourceProfile.ANY,
+			new SimpleAckingTaskManagerGateway());
+	}
+
+	private static LogicalSlot allocateSingleLogicalSlotFromPhysicalSlot(
+			final PhysicalSlot physicalSlot,
+			final boolean slotWillBeOccupiedIndefinitely) {
+
+		return SingleLogicalSlot.allocateFromPhysicalSlot(
+			new SlotRequestId(),
+			physicalSlot,
+			Locality.UNKNOWN,
+			new TestingSlotOwner(),
+			slotWillBeOccupiedIndefinitely);
+	}
+}