You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/21 08:28:45 UTC

[GitHub] [flink] zhuzhurk commented on a change in pull request #13181: [FLINK-18957] Implement logical request bulk tracking in SlotSharingExecutionSlotAllocator

zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r474428620



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBulkChecker {
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+	private final Clock clock;
+
+	PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) {
+		this.slotsRetriever = checkNotNull(slotsRetriever);
+		this.clock = checkNotNull(clock);
+	}
+
+	void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+		this.componentMainThreadExecutor = mainThreadExecutor;
+	}
+
+	@Override
+	public void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time timeout) {
+		schedulePendingRequestBulkTimeoutCheck(new PhysicalSlotRequestBulkWithTimestamp(bulk), timeout);
+	}
+
+	private void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp bulk, Time timeout) {
+		bulk.markUnfulfillable(clock.relativeTimeMillis());

Review comment:
        If we implement it like this, it turns to be that we require the bulk to pass every fulfill-ability check or fails otherwise. This means a request can be recognized as timed out if a slot gets lost right before its fulfill-ability check even if the bulk has passed the check last time.
   
   How about moving this `markUnfulfillable()` into the above `schedulePendingRequestBulkTimeoutCheck ()`.
    

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not fulfilled.

Review comment:
       `is not fulfilled` -> `is not fulfillable`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -41,16 +41,12 @@
 class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
 	private final PreferredLocationsRetriever preferredLocationsRetriever;
 
-	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
 	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
 
 	MergingSharedSlotProfileRetrieverFactory(

Review comment:
       Seems extra indentation of all method params in this class are removed unexpectedly.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBulkChecker {
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+	private final Clock clock;
+
+	PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> slotsRetriever, final Clock clock) {
+		this.slotsRetriever = checkNotNull(slotsRetriever);
+		this.clock = checkNotNull(clock);
+	}
+
+	void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+		this.componentMainThreadExecutor = mainThreadExecutor;
+	}
+
+	@Override
+	public void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time timeout) {
+		schedulePendingRequestBulkTimeoutCheck(new PhysicalSlotRequestBulkWithTimestamp(bulk), timeout);
+	}
+
+	private void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp bulk, Time timeout) {
+		bulk.markUnfulfillable(clock.relativeTimeMillis());
+		componentMainThreadExecutor.schedule(() -> {
+			TimeoutCheckResult result = checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+			switch (result) {
+				case PENDING:
+					//re-schedule the timeout check
+					schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+					break;
+				case TIMEOUT:
+					bulk.getPhysicalSlotRequestBulk().cancel(new TimeoutException("Slot request bulk is not fulfillable!"));
+					break;
+				case FULFILLED:
+				default:
+					// no action to take
+					break;
+			}
+		}, timeout.getSize(), timeout.getUnit());
+	}
+
+	/**
+	 * Check the slot request bulk and timeout its requests if it has been unfulfillable for too long.
+	 * @param slotRequestBulk bulk of slot requests
+	 * @param slotRequestTimeout indicates how long a pending request can be unfulfillable
+	 * @return result of the check, indicating the bulk is fulfilled, still pending, or timed out
+	 */
+	@VisibleForTesting
+	TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+			final PhysicalSlotRequestBulkWithTimestamp slotRequestBulk,
+			final Time slotRequestTimeout) {
+
+		if (slotRequestBulk.getPhysicalSlotRequestBulk().getPendingRequests().isEmpty()) {
+			return TimeoutCheckResult.FULFILLED;
+		}
+
+		final boolean fulfillable = isSlotRequestBulkFulfillable(slotRequestBulk.getPhysicalSlotRequestBulk(), slotsRetriever);
+		if (fulfillable) {
+			slotRequestBulk.markFulfillable();
+		} else {
+			final long currentTimestamp = clock.relativeTimeMillis();
+
+			slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+			final long unfulfillableSince = slotRequestBulk.getUnfulfillableSince();
+			if (unfulfillableSince + slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+				return TimeoutCheckResult.TIMEOUT;
+			}
+		}
+
+		return TimeoutCheckResult.PENDING;
+	}
+
+	/**
+	 * Returns whether the given bulk of slot requests are possible to be fulfilled at the same time
+	 * with all the reusable slots in the slot pool. A reusable slot means the slot is available or
+	 * will not be occupied indefinitely.
+	 *
+	 * @param slotRequestBulk bulk of slot requests to check
+	 * @param slotsRetriever supplies slots to be used for the fulfill-ability check
+	 * @return true if the slot requests are possible to be fulfilled, otherwise false
+	 */
+	@VisibleForTesting
+	static boolean isSlotRequestBulkFulfillable(
+			final PhysicalSlotRequestBulk slotRequestBulk,
+			final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+		final Set<AllocationID> assignedSlots = slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+		final Set<SlotInfo> reusableSlots = getReusableSlots(slotsRetriever, assignedSlots);
+		return areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), reusableSlots);
+	}
+
+	private static Set<SlotInfo> getReusableSlots(
+			final Supplier<Set<SlotInfo>> slotsRetriever,
+			final Set<AllocationID> slotsToExclude) {
+
+		return slotsRetriever.get().stream()
+			.filter(slotInfo -> !slotInfo.willBeOccupiedIndefinitely())
+			.filter(slotInfo -> !slotsToExclude.contains(slotInfo.getAllocationId()))
+			.collect(Collectors.toSet());
+	}
+
+	private static boolean areRequestsFulfillableWithSlots(
+			final Collection<ResourceProfile> requestResourceProfiles,
+			final Set<SlotInfo> slots) {
+
+		final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+		for (ResourceProfile requestResourceProfile : requestResourceProfiles) {
+			final Optional<SlotInfo> matchedSlot = findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+			if (matchedSlot.isPresent()) {
+				remainingSlots.remove(matchedSlot.get());
+			} else {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static Optional<SlotInfo> findMatchingSlotForRequest(
+			final ResourceProfile requestResourceProfile,
+			final Collection<SlotInfo> slots) {
+
+		return slots.stream().filter(slot -> slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+	}
+
+	static PhysicalSlotRequestBulkCheckerImpl fromSlotPool(final SlotPool slotPool, final Clock clock) {
+		return new PhysicalSlotRequestBulkCheckerImpl(() -> getAllSlotInfos(slotPool), clock);
+	}
+
+	private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
+		return Stream
+			.concat(
+				slotPool.getAvailableSlotsInformation().stream(),
+				slotPool.getAllocatedSlotsInformation().stream())
+			.collect(Collectors.toSet());
+	}
+
+	enum TimeoutCheckResult {
+		PENDING,
+
+		FULFILLED,
+
+		TIMEOUT
+	}
+
+	@VisibleForTesting
+	static class PhysicalSlotRequestBulkWithTimestamp {

Review comment:
       How about to let it implement `PhysicalSlotRequestBulk` to simplify its usages and hide the internal `PhysicalSlotRequestBulk`? 
   And what about adding a separate class file for it? I can see we have a separate test class for it already.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +103,16 @@ public SchedulerImpl(
 			"Scheduler is not initialized with proper main thread executor. " +
 				"Call to Scheduler.start(...) required.");
 
-		this.bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+		this.slotRequestBulkChecker = PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool, SystemClock.getInstance());
+		this.bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);
+		this.physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);

Review comment:
       The `PhysicalSlotProviderImpl` ctor will invoke `slotPool.disableBatchSlotRequestTimeoutCheck()` unconditionally.
   Maybe we should move that invocation to `PhysicalSlotProviderImpl#requestNewSlot()`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkImpl.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a bulk of physical slot requests.
+ */
+class PhysicalSlotRequestBulkImpl implements PhysicalSlotRequestBulk {

Review comment:
       I think this class can also be removed when we remove `BulkSlotProvider` and `OneSlotPerExecutionSlotAllocator`.
   But it is used in tests of `PhysicalSlotRequestBulkCheckerImpl` and `PhysicalSlotRequestBulkWithTimestamp`.
   How about adding a separate `TestingPhysicalSlotRequestBulk` for those tests? 
   I'm also fine if we do it in a separate task or when we are removing the `BulkSlotProvider`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org