You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/21 14:29:36 UTC

[GitHub] [flink] zentol commented on a change in pull request #13722: [FLINK-19636][coordination] Add DeclarativeSlotPool

zentol commented on a change in pull request #13722:
URL: https://github.com/apache/flink/pull/13722#discussion_r509336961



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Slot pool interface which uses Flink's declarative resource management
+ * protocol to acquire resources.
+ *
+ * <p>In order to acquire new resources, users need to increase the required
+ * resources. Once they no longer need the resources, users need to decrease
+ * the required resources so that superfluous resources can be returned.
+ */
+public interface DeclarativeSlotPool {
+
+	/**
+	 * Increases the resource requirements by increment.
+	 *
+	 * @param increment increment by which to increase the resource requirements
+	 */
+	void increaseResourceRequirementsBy(ResourceCounter increment);
+
+	/**
+	 * Decreases the resource requirements by decrement.
+	 *
+	 * @param decrement decrement by which to decrease the resource requirements
+	 */
+	void decreaseResourceRequirementsBy(ResourceCounter decrement);
+
+	/**
+	 * Returns the current resource requirements.
+	 *
+	 * @return current resource requirements
+	 */
+	Collection<ResourceRequirement> getResourceRequirements();
+
+	/**
+	 * Offers slots to this slot pool. The slot pool is free to accept as many slots as it
+	 * needs.
+	 *
+	 * @param offers offers containing the list of slots offered to this slot pool
+	 * @param taskManagerLocation taskManagerLocation is the location of the offering TaskExecutor
+	 * @param taskManagerGateway taskManagerGateway is the gateway to talk to the offering TaskExecutor
+	 * @param currentTime currentTime is the time the slots are being offered
+	 * @return collection of accepted slots; the other slot offers are implicitly rejected
+	 */
+	Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime);
+
+	/**
+	 * Returns the slot information for all free slots (slots which can be allocated from the slot pool).
+	 *
+	 * @return collection of free slot information
+	 */
+	Collection<SlotInfoWithUtilization> getFreeSlotsInformation();
+
+	/**
+	 * Returns the slot information for all slots (free and allocated slots).
+	 *
+	 * @return collection of slot information
+	 */
+	Collection<? extends SlotInfo> getAllSlotsInformation();
+
+	/**
+	 * Reserves the free slot identified by the given allocationId and maps it to
+	 * the given requiredSlotProfile.
+	 *
+	 * @param allocationId allocationId identifies the free slot to allocate
+	 * @param requiredSlotProfile requiredSlotProfile specifying the resource requirement
+	 * @return a PhysicalSlot representing the allocated slot
+	 * @throws IllegalStateException if no free slot with the given allocationId exists or if
+	 *                               the specified slot cannot fulfill the requiredSlotProfile
+	 */
+	PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile);
+
+	/**
+	 * Frees the reserved a slot identified by the given allocationId. If no slot
+	 * with allocationId exists, then the call is ignored.
+	 *
+	 * <p>Whether the freed slot is returned to the owning TaskExecutor is implementation
+	 * dependent.
+	 *
+	 * @param allocationId allocationId identifying the slot to release
+	 * @param cause cause for releasing the slot; can be {@code null}
+	 * @param currentTime currentTime when the slot was released
+	 * @return info about freed slot, if a reserved slot with the given allocationId existed
+	 */
+	Optional<ResourceCounter> freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime);

Review comment:
       Optional is unnecessary; the returned ResourceCounter could just be empty

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Slot pool interface which uses Flink's declarative resource management
+ * protocol to acquire resources.
+ *
+ * <p>In order to acquire new resources, users need to increase the required
+ * resources. Once they no longer need the resources, users need to decrease
+ * the required resources so that superfluous resources can be returned.
+ */
+public interface DeclarativeSlotPool {
+
+	/**
+	 * Increases the resource requirements by increment.
+	 *
+	 * @param increment increment by which to increase the resource requirements
+	 */
+	void increaseResourceRequirementsBy(ResourceCounter increment);
+
+	/**
+	 * Decreases the resource requirements by decrement.
+	 *
+	 * @param decrement decrement by which to decrease the resource requirements
+	 */
+	void decreaseResourceRequirementsBy(ResourceCounter decrement);
+
+	/**
+	 * Returns the current resource requirements.
+	 *
+	 * @return current resource requirements
+	 */
+	Collection<ResourceRequirement> getResourceRequirements();
+
+	/**
+	 * Offers slots to this slot pool. The slot pool is free to accept as many slots as it
+	 * needs.
+	 *
+	 * @param offers offers containing the list of slots offered to this slot pool
+	 * @param taskManagerLocation taskManagerLocation is the location of the offering TaskExecutor
+	 * @param taskManagerGateway taskManagerGateway is the gateway to talk to the offering TaskExecutor
+	 * @param currentTime currentTime is the time the slots are being offered
+	 * @return collection of accepted slots; the other slot offers are implicitly rejected
+	 */
+	Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime);
+
+	/**
+	 * Returns the slot information for all free slots (slots which can be allocated from the slot pool).
+	 *
+	 * @return collection of free slot information
+	 */
+	Collection<SlotInfoWithUtilization> getFreeSlotsInformation();
+
+	/**
+	 * Returns the slot information for all slots (free and allocated slots).
+	 *
+	 * @return collection of slot information
+	 */
+	Collection<? extends SlotInfo> getAllSlotsInformation();
+
+	/**
+	 * Reserves the free slot identified by the given allocationId and maps it to
+	 * the given requiredSlotProfile.
+	 *
+	 * @param allocationId allocationId identifies the free slot to allocate
+	 * @param requiredSlotProfile requiredSlotProfile specifying the resource requirement
+	 * @return a PhysicalSlot representing the allocated slot
+	 * @throws IllegalStateException if no free slot with the given allocationId exists or if
+	 *                               the specified slot cannot fulfill the requiredSlotProfile
+	 */
+	PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile);
+
+	/**
+	 * Frees the reserved a slot identified by the given allocationId. If no slot
+	 * with allocationId exists, then the call is ignored.
+	 *
+	 * <p>Whether the freed slot is returned to the owning TaskExecutor is implementation
+	 * dependent.
+	 *
+	 * @param allocationId allocationId identifying the slot to release
+	 * @param cause cause for releasing the slot; can be {@code null}
+	 * @param currentTime currentTime when the slot was released
+	 * @return info about freed slot, if a reserved slot with the given allocationId existed
+	 */
+	Optional<ResourceCounter> freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime);
+
+	/**
+	 * Releases all slots belonging to the owning TaskExecutor if it has been registered.
+	 *
+	 * @param owner owner identifying the owning TaskExecutor
+	 * @param cause cause for failing the slots
+	 * @return info about released slot
+	 */
+	ResourceCounter releaseSlots(ResourceID owner, Exception cause);
+
+	/**
+	 * Releases the slot specified by allocationId if one exists.
+	 *
+	 * @param allocationId allocationId identifying the slot to fail
+	 * @param cause cause for failing the slot
+	 * @return info about released slot, if a slot with the given allocationId existed
+	 */
+	Optional<ResourceCounter> releaseSlot(AllocationID allocationId, Exception cause);

Review comment:
       same as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org