You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/25 10:27:56 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator

tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r476318798



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +101,15 @@ public SchedulerImpl(
 			"Scheduler is not initialized with proper main thread executor. " +
 				"Call to Scheduler.start(...) required.");
 
-		this.bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+		this.slotRequestBulkChecker = PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool, SystemClock.getInstance());
+		this.bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);

Review comment:
       It is a bit unrelated because the change has been introduced before. I was wondering why do we have to touch the `SchedulerImpl` at all? Isn't this implementation only used by the non-pipelined region scheduler implementations? Maybe it would have been easier to separate the scheduler implementations in `SchedulerImpl` for the old strategies and a new `PipelinedRegionScheduler` implementation which only supports the bulk physical slot requests.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
##########
@@ -87,9 +86,12 @@
 			SlotRequestId slotRequestId,
 			ResourceProfile resourceProfile,
 			boolean willSlotBeOccupiedIndefinitely) {
-		return willSlotBeOccupiedIndefinitely ?
-			slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null) :
-			slotPool.requestNewAllocatedBatchSlot(slotRequestId, resourceProfile);
+		if (willSlotBeOccupiedIndefinitely) {
+			return slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null);
+		} else {
+			slotPool.disableBatchSlotRequestTimeoutCheck();

Review comment:
       Why can't this happen in the constructor?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not fulfillable.

Review comment:
       Nit: different spelling of `fulfill`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
##########
@@ -20,60 +20,17 @@
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
 
 /**
  * Represents a bulk of physical slot requests.
  */
-class PhysicalSlotRequestBulk {
+public interface PhysicalSlotRequestBulk {
+	Collection<ResourceProfile> getPendingRequests();
 
-	private final Map<SlotRequestId, ResourceProfile> pendingRequests;
+	Set<AllocationID> getAllocationIdsOfFulfilledRequests();
 
-	private final Map<SlotRequestId, AllocationID> fulfilledRequests = new HashMap<>();
-
-	private long unfulfillableTimestamp = Long.MAX_VALUE;
-
-	PhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
-		this.pendingRequests = physicalSlotRequests.stream()
-			.collect(Collectors.toMap(
-				PhysicalSlotRequest::getSlotRequestId,
-				r -> r.getSlotProfile().getPhysicalSlotResourceProfile()));
-	}
-
-	void markRequestFulfilled(final SlotRequestId slotRequestId, final AllocationID allocationID) {
-		pendingRequests.remove(slotRequestId);
-		fulfilledRequests.put(slotRequestId, allocationID);
-	}
-
-	Map<SlotRequestId, ResourceProfile> getPendingRequests() {
-		return Collections.unmodifiableMap(pendingRequests);
-	}
-
-	Map<SlotRequestId, AllocationID> getFulfilledRequests() {
-		return Collections.unmodifiableMap(fulfilledRequests);
-	}
-
-	void markFulfillable() {
-		unfulfillableTimestamp = Long.MAX_VALUE;
-	}
-
-	void markUnfulfillable(final long currentTimestamp) {
-		if (isFulfillable()) {
-			unfulfillableTimestamp = currentTimestamp;
-		}
-	}
-
-	long getUnfulfillableSince() {
-		return unfulfillableTimestamp;
-	}
-
-	private boolean isFulfillable() {
-		return unfulfillableTimestamp == Long.MAX_VALUE;
-	}
+	void cancel(Throwable cause);

Review comment:
       Interfaces should always have JavaDocs stating their contracts.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not fulfillable.
  */
-class PhysicalSlotRequestBulkChecker {
-
-	private final Supplier<Set<SlotInfo>> slotsRetriever;
-
-	private final Clock clock;
-
-	PhysicalSlotRequestBulkChecker(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) {
-		this.slotsRetriever = checkNotNull(slotsRetriever);
-		this.clock = checkNotNull(clock);
-	}
-
-	PhysicalSlotRequestBulk createPhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> physicalSlotRequests) {
-		final PhysicalSlotRequestBulk slotRequestBulk = new PhysicalSlotRequestBulk(physicalSlotRequests);
-		slotRequestBulk.markUnfulfillable(clock.relativeTimeMillis());
-
-		return slotRequestBulk;
-	}
-
-	/**
-	 * Check the slot request bulk and timeout its requests if it has been unfulfillable for too long.
-	 * @param slotRequestBulk bulk of slot requests
-	 * @param slotRequestTimeout indicates how long a pending request can be unfulfillable
-	 * @return result of the check, indicating the bulk is fulfilled, still pending, or timed out
-	 */
-	TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
-			final PhysicalSlotRequestBulk slotRequestBulk,
-			final Time slotRequestTimeout) {
-
-		if (slotRequestBulk.getPendingRequests().isEmpty()) {
-			return TimeoutCheckResult.FULFILLED;
-		}
-
-		final boolean fulfillable = isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
-		if (fulfillable) {
-			slotRequestBulk.markFulfillable();
-		} else {
-			final long currentTimestamp = clock.relativeTimeMillis();
-
-			slotRequestBulk.markUnfulfillable(currentTimestamp);
-
-			final long unfulfillableSince = slotRequestBulk.getUnfulfillableSince();
-			if (unfulfillableSince + slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
-				return TimeoutCheckResult.TIMEOUT;
-			}
-		}
-
-		return TimeoutCheckResult.PENDING;
-	}
-
-	/**
-	 * Returns whether the given bulk of slot requests are possible to be fulfilled at the same time
-	 * with all the reusable slots in the slot pool. A reusable slot means the slot is available or
-	 * will not be occupied indefinitely.
-	 *
-	 * @param slotRequestBulk bulk of slot requests to check
-	 * @param slotsRetriever supplies slots to be used for the fulfill-ability check
-	 * @return true if the slot requests are possible to be fulfilled, otherwise false
-	 */
-	@VisibleForTesting
-	static boolean isSlotRequestBulkFulfillable(
-			final PhysicalSlotRequestBulk slotRequestBulk,
-			final Supplier<Set<SlotInfo>> slotsRetriever) {
-
-		final Set<AllocationID> assignedSlots = new HashSet<>(slotRequestBulk.getFulfilledRequests().values());
-		final Set<SlotInfo> reusableSlots = getReusableSlots(slotsRetriever, assignedSlots);
-		return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(), reusableSlots);
-	}
-
-	private static Set<SlotInfo> getReusableSlots(
-			final Supplier<Set<SlotInfo>> slotsRetriever,
-			final Set<AllocationID> slotsToExclude) {
-
-		return slotsRetriever.get().stream()
-			.filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely())
-			.filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId()))
-			.collect(Collectors.toSet());
-	}
-
-	private static boolean areRequestsFulfillableWithSlots(
-			final Collection<ResourceProfile> requestResourceProfiles,
-			final Set<SlotInfo> slots) {
-
-		final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
-		for (ResourceProfile requestResourceProfile : requestResourceProfiles) {
-			final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
-			if (matchedSlot.isPresent()) {
-				remainingSlots.remove(matchedSlot.get());
-			} else {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	private static Optional<SlotInfo> findMatchingSlotForRequest(
-			final ResourceProfile requestResourceProfile,
-			final Collection<SlotInfo> slots) {
-
-		return slots.stream().filter(slot -> slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
-	}
-
-	enum TimeoutCheckResult {
-		PENDING,
-
-		FULFILLED,
-
-		TIMEOUT
-	}
+@FunctionalInterface
+public interface PhysicalSlotRequestBulkChecker {
+	void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time timeout);

Review comment:
       Interfaces should always have JavaDocs stating their contracts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org