You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/08 00:38:01 UTC
[flink] 03/03: [FLINK-21098][coordination] Add SlotAllocator
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d1f099c3c022e9274171ddb1a8499b6b111c1656
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 28 11:30:37 2021 +0100
[FLINK-21098][coordination] Add SlotAllocator
---
.../declarative/allocator/FreeSlotFunction.java | 38 ++++
.../declarative/allocator/JobInformation.java | 39 ++++
.../declarative/allocator/ReserveSlotFunction.java | 35 +++
.../declarative/allocator/SharedSlot.java | 141 ++++++++++++
.../declarative/allocator/SlotAllocator.java | 71 ++++++
.../allocator/SlotSharingSlotAllocator.java | 215 ++++++++++++++++++
.../declarative/allocator/VertexParallelism.java | 34 +++
.../VertexParallelismWithSlotSharing.java | 47 ++++
.../declarative/allocator/SharedSlotTest.java | 180 ++++++++++++++++
.../allocator/SlotSharingSlotAllocatorTest.java | 239 +++++++++++++++++++++
.../declarative/allocator/TestSlotInfo.java | 55 +++++
11 files changed, 1094 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java
new file mode 100644
index 0000000..a92ad6f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import javax.annotation.Nullable;
+
+/** A function for freeing slots. */
+@FunctionalInterface
+public interface FreeSlotFunction {
+ /**
+ * Frees the slot identified by the given {@link AllocationID}.
+ *
+ * <p>If the slot is freed due to exceptional circumstances a {@link Throwable} cause should be
+ * provided.
+ *
+ * @param allocationId identifies the slot
+ * @param cause reason for why the slot was freed; null during normal operations
+ * @param timestamp when the slot was freed
+ */
+ void freeSlot(AllocationID allocationId, @Nullable Throwable cause, long timestamp);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
new file mode 100644
index 0000000..f9c9015
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import java.util.Collection;
+
+/** Information about the job. */
+public interface JobInformation {
+ Collection<SlotSharingGroup> getSlotSharingGroups();
+
+ VertexInformation getVertexInformation(JobVertexID jobVertexId);
+
+ /** Information about a single vertex. */
+ interface VertexInformation {
+ JobVertexID getJobVertexID();
+
+ int getParallelism();
+
+ SlotSharingGroup getSlotSharingGroup();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/ReserveSlotFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/ReserveSlotFunction.java
new file mode 100644
index 0000000..3a26ee4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/ReserveSlotFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+
+/** A function for reserving slots. */
+@FunctionalInterface
+public interface ReserveSlotFunction {
+ /**
+ * Reserves the slot identified by the given allocation ID for the given resource profile.
+ *
+ * @param allocationId identifies the slot
+ * @param resourceProfile resource profile the slot must be able to fulfill
+ * @return reserved slot
+ */
+ PhysicalSlot reserveSlot(AllocationID allocationId, ResourceProfile resourceProfile);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
new file mode 100644
index 0000000..90cce74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Shared slot implementation for the declarative scheduler. */
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+ private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
+
+ private final SlotRequestId physicalSlotRequestId;
+
+ private final PhysicalSlot physicalSlot;
+
+ private final Runnable externalReleaseCallback;
+
+ private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
+
+ private final boolean slotWillBeOccupiedIndefinitely;
+
+ private State state;
+
+ public SharedSlot(
+ SlotRequestId physicalSlotRequestId,
+ PhysicalSlot physicalSlot,
+ boolean slotWillBeOccupiedIndefinitely,
+ Runnable externalReleaseCallback) {
+ this.physicalSlotRequestId = physicalSlotRequestId;
+ this.physicalSlot = physicalSlot;
+ this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+ this.externalReleaseCallback = externalReleaseCallback;
+ this.allocatedLogicalSlots = new HashMap<>();
+
+ Preconditions.checkState(
+ physicalSlot.tryAssignPayload(this),
+ "The provided slot (%s) was not free.",
+ physicalSlot.getAllocationId());
+ this.state = State.ALLOCATED;
+ }
+
+ /**
+ * Registers an allocation request for a logical slot.
+ *
+ * @return the logical slot
+ */
+ public LogicalSlot allocateLogicalSlot() {
+ LOG.debug("Allocating logical slot from shared slot ({})", physicalSlotRequestId);
+ Preconditions.checkState(
+ state == State.ALLOCATED, "The shared slot has already been released.");
+
+ final LogicalSlot slot =
+ new SingleLogicalSlot(
+ new SlotRequestId(),
+ physicalSlot,
+ Locality.UNKNOWN,
+ this,
+ slotWillBeOccupiedIndefinitely);
+
+ allocatedLogicalSlots.put(slot.getSlotRequestId(), slot);
+ return slot;
+ }
+
+ @Override
+ public void returnLogicalSlot(LogicalSlot logicalSlot) {
+ LOG.debug("Returning logical slot to shared slot ({})", physicalSlotRequestId);
+ Preconditions.checkState(
+ state == State.ALLOCATED, "The shared slot has already been released.");
+
+ Preconditions.checkState(!logicalSlot.isAlive(), "Returned logic slot must not be alive.");
+ Preconditions.checkState(
+ allocatedLogicalSlots.remove(logicalSlot.getSlotRequestId()) != null,
+ "Trying to remove a logical slot request which has been either already removed or never created.");
+ tryReleaseExternally();
+ }
+
+ @Override
+ public void release(Throwable cause) {
+ LOG.debug("Release shared slot ({})", physicalSlotRequestId);
+ Preconditions.checkState(
+ state == State.ALLOCATED, "The shared slot has already been released.");
+
+ // copy the logical slot collection to avoid ConcurrentModificationException
+ // if logical slot releases cause cancellation of other executions
+ // which will try to call returnLogicalSlot and modify allocatedLogicalSlots collection
+ final List<LogicalSlot> logicalSlotsToRelease =
+ new ArrayList<>(allocatedLogicalSlots.values());
+ for (LogicalSlot allocatedLogicalSlot : logicalSlotsToRelease) {
+ allocatedLogicalSlot.releaseSlot(cause);
+ }
+ allocatedLogicalSlots.clear();
+ tryReleaseExternally();
+ }
+
+ private void tryReleaseExternally() {
+ if (state != State.RELEASED && allocatedLogicalSlots.isEmpty()) {
+ state = State.RELEASED;
+ LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
+ externalReleaseCallback.run();
+ }
+ }
+
+ @Override
+ public boolean willOccupySlotIndefinitely() {
+ return slotWillBeOccupiedIndefinitely;
+ }
+
+ private enum State {
+ ALLOCATED,
+ RELEASED
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java
new file mode 100644
index 0000000..3eb9eca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+/** Component for calculating the slot requirements and mapping of vertices to slots. */
+public interface SlotAllocator<T extends VertexParallelism> {
+
+ /**
+ * Calculates the total resources required for scheduling the given vertices.
+ *
+ * @param vertices vertices to schedule
+ * @return required resources
+ */
+ ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);
+
+ /**
+ * Determines the parallelism at which the vertices could be scheduled given the collection of
+ * slots. This method may be called with any number of slots providing any amount of resources,
+ * irrespective of what {@link #calculateRequiredSlots(Iterable)} returned.
+ *
+ * <p>If a {@link VertexParallelism} is returned then it covers all vertices contained in the
+ * given job information.
+ *
+ * <p>A returned {@link VertexParallelism} should be directly consumed afterwards (by either
+ * discarding it or calling {@link #reserveResources(VertexParallelism)}, as there is no
+ * guarantee that the assignment remains valid over time (because slots can be lost).
+ *
+ * <p>Implementations of this method must be side-effect free. There is no guarantee that the
+ * result of this method is ever passed to {@link #reserveResources(VertexParallelism)}.
+ *
+ * @param jobInformation information about the job graph
+ * @param slots slots to consider for determining the parallelism
+ * @return potential parallelism for all vertices and implementation-specific information for
+ * how the vertices could be assigned to slots, if all vertices could be run with the given
+ * slots
+ */
+ Optional<T> determineParallelism(
+ JobInformation jobInformation, Collection<? extends SlotInfo> slots);
+
+ /**
+ * Reserves slots according to the given assignment.
+ *
+ * @param vertexParallelism information on how slots should be assigned to the slots
+ * @return mapping of vertices to slots
+ */
+ Map<ExecutionVertexID, LogicalSlot> reserveResources(T vertexParallelism);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
new file mode 100644
index 0000000..e422fd5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements SlotAllocator<VertexParallelismWithSlotSharing> {
+
+ private final ReserveSlotFunction reserveSlotFunction;
+ private final FreeSlotFunction freeSlotFunction;
+
+ public SlotSharingSlotAllocator(
+ ReserveSlotFunction reserveSlot, FreeSlotFunction freeSlotFunction) {
+ this.reserveSlotFunction = reserveSlot;
+ this.freeSlotFunction = freeSlotFunction;
+ }
+
+ @Override
+ public ResourceCounter calculateRequiredSlots(
+ Iterable<JobInformation.VertexInformation> vertices) {
+ int numTotalRequiredSlots = 0;
+ for (Integer requiredSlots : getMaxParallelismForSlotSharingGroups(vertices).values()) {
+ numTotalRequiredSlots += requiredSlots;
+ }
+ return ResourceCounter.withResource(ResourceProfile.UNKNOWN, numTotalRequiredSlots);
+ }
+
+ private static Map<SlotSharingGroupId, Integer> getMaxParallelismForSlotSharingGroups(
+ Iterable<JobInformation.VertexInformation> vertices) {
+ final Map<SlotSharingGroupId, Integer> maxParallelismForSlotSharingGroups = new HashMap<>();
+ for (JobInformation.VertexInformation vertex : vertices) {
+ maxParallelismForSlotSharingGroups.compute(
+ vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+ (slotSharingGroupId, currentMaxParallelism) ->
+ currentMaxParallelism == null
+ ? vertex.getParallelism()
+ : Math.max(currentMaxParallelism, vertex.getParallelism()));
+ }
+ return maxParallelismForSlotSharingGroups;
+ }
+
+ @Override
+ public Optional<VertexParallelismWithSlotSharing> determineParallelism(
+ JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) {
+ // TODO: This can waste slots if the max parallelism for slot sharing groups is not equal
+ final int slotsPerSlotSharingGroup =
+ freeSlots.size() / jobInformation.getSlotSharingGroups().size();
+
+ if (slotsPerSlotSharingGroup == 0) {
+ // => less slots than slot-sharing groups
+ return Optional.empty();
+ }
+
+ final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+
+ final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new ArrayList<>();
+ final Map<JobVertexID, Integer> allVertexParallelism = new HashMap<>();
+
+ for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
+ final List<JobInformation.VertexInformation> containedJobVertices =
+ slotSharingGroup.getJobVertexIds().stream()
+ .map(jobInformation::getVertexInformation)
+ .collect(Collectors.toList());
+
+ final Map<JobVertexID, Integer> vertexParallelism =
+ determineParallelism(containedJobVertices, slotsPerSlotSharingGroup);
+
+ final Iterable<ExecutionSlotSharingGroup> sharedSlotToVertexAssignment =
+ createExecutionSlotSharingGroups(vertexParallelism);
+
+ for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+ sharedSlotToVertexAssignment) {
+ final SlotInfo slotInfo = slotIterator.next();
+
+ assignments.add(
+ new ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+ }
+ allVertexParallelism.putAll(vertexParallelism);
+ }
+
+ return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
+ }
+
+ private static Map<JobVertexID, Integer> determineParallelism(
+ Collection<JobInformation.VertexInformation> containedJobVertices, int availableSlots) {
+ final Map<JobVertexID, Integer> vertexParallelism = new HashMap<>();
+ for (JobInformation.VertexInformation jobVertex : containedJobVertices) {
+ final int parallelism = Math.min(jobVertex.getParallelism(), availableSlots);
+
+ vertexParallelism.put(jobVertex.getJobVertexID(), parallelism);
+ }
+
+ return vertexParallelism;
+ }
+
+ private static Iterable<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+ Map<JobVertexID, Integer> containedJobVertices) {
+ final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>();
+
+ for (Map.Entry<JobVertexID, Integer> jobVertex : containedJobVertices.entrySet()) {
+ for (int i = 0; i < jobVertex.getValue(); i++) {
+ sharedSlotToVertexAssignment
+ .computeIfAbsent(i, ignored -> new HashSet<>())
+ .add(new ExecutionVertexID(jobVertex.getKey(), i));
+ }
+ }
+
+ return sharedSlotToVertexAssignment.values().stream()
+ .map(ExecutionSlotSharingGroup::new)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+ VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) {
+ final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new HashMap<>();
+
+ for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
+ vertexParallelismWithSlotSharing.getAssignments()) {
+ final SharedSlot sharedSlot =
+ reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+
+ for (ExecutionVertexID executionVertexId :
+ executionSlotSharingGroup
+ .getExecutionSlotSharingGroup()
+ .getContainedExecutionVertices()) {
+ final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+ assignedSlots.put(executionVertexId, logicalSlot);
+ }
+ }
+
+ return assignedSlots;
+ }
+
+ private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
+ final PhysicalSlot physicalSlot =
+ reserveSlotFunction.reserveSlot(
+ slotInfo.getAllocationId(), ResourceProfile.UNKNOWN);
+
+ return new SharedSlot(
+ new SlotRequestId(),
+ physicalSlot,
+ slotInfo.willBeOccupiedIndefinitely(),
+ () ->
+ freeSlotFunction.freeSlot(
+ slotInfo.getAllocationId(), null, System.currentTimeMillis()));
+ }
+
+ static class ExecutionSlotSharingGroup {
+ private final Set<ExecutionVertexID> containedExecutionVertices;
+
+ public ExecutionSlotSharingGroup(Set<ExecutionVertexID> containedExecutionVertices) {
+ this.containedExecutionVertices = containedExecutionVertices;
+ }
+
+ public Collection<ExecutionVertexID> getContainedExecutionVertices() {
+ return containedExecutionVertices;
+ }
+ }
+
+ static class ExecutionSlotSharingGroupAndSlot {
+ private final ExecutionSlotSharingGroup executionSlotSharingGroup;
+ private final SlotInfo slotInfo;
+
+ public ExecutionSlotSharingGroupAndSlot(
+ ExecutionSlotSharingGroup executionSlotSharingGroup, SlotInfo slotInfo) {
+ this.executionSlotSharingGroup = executionSlotSharingGroup;
+ this.slotInfo = slotInfo;
+ }
+
+ public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
+ return executionSlotSharingGroup;
+ }
+
+ public SlotInfo getSlotInfo() {
+ return slotInfo;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelism.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelism.java
new file mode 100644
index 0000000..39a887d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelism.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, Collection)}, describing
+ * the parallelism each vertex could be scheduled with.
+ *
+ * <p>{@link SlotAllocator} implementations may encode additional information to be used in {@link
+ * SlotAllocator#reserveResources(VertexParallelism)}.
+ */
+public interface VertexParallelism {
+ Map<JobVertexID, Integer> getMaxParallelismForVertices();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelismWithSlotSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelismWithSlotSharing.java
new file mode 100644
index 0000000..84b4d59
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelismWithSlotSharing.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Map;
+
+/** {@link VertexParallelism} implementation for the {@link SlotSharingSlotAllocator}. */
+public class VertexParallelismWithSlotSharing implements VertexParallelism {
+
+ private final Map<JobVertexID, Integer> vertexParallelism;
+ private final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> assignments;
+
+ VertexParallelismWithSlotSharing(
+ Map<JobVertexID, Integer> vertexParallelism,
+ Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> assignments) {
+ this.vertexParallelism = vertexParallelism;
+ this.assignments = Preconditions.checkNotNull(assignments);
+ }
+
+ Iterable<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> getAssignments() {
+ return assignments;
+ }
+
+ @Override
+ public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
+ return vertexParallelism;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
new file mode 100644
index 0000000..97dc1f4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+ @Test
+ public void testConstructorAssignsPayload() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+ assertThat(physicalSlot.getPayload(), not(nullValue()));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+ }
+
+ @Test
+ public void testAllocateLogicalSlot() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+ final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+ assertThat(logicalSlot.getAllocationId(), equalTo(physicalSlot.getAllocationId()));
+ assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+ assertThat(logicalSlot.getPayload(), nullValue());
+ assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlot.getPhysicalSlotNumber()));
+ assertThat(
+ logicalSlot.getTaskManagerLocation(),
+ equalTo(physicalSlot.getTaskManagerLocation()));
+ assertThat(
+ logicalSlot.getTaskManagerGateway(), equalTo(physicalSlot.getTaskManagerGateway()));
+ }
+
+ @Test
+ public void testAllocateLogicalSlotIssuesUniqueSlotRequestIds() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+ final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+ final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+ assertThat(logicalSlot1.getSlotRequestId(), not(equalTo(logicalSlot2.getSlotRequestId())));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReturnLogicalSlotRejectsAliveSlots() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+ final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+ sharedSlot.returnLogicalSlot(logicalSlot);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReturnLogicalSlotRejectsUnknownSlot() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+ final LogicalSlot logicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
+ logicalSlot.releaseSlot(new Exception("test"));
+
+ sharedSlot.returnLogicalSlot(logicalSlot);
+ }
+
+ @Test
+ public void testReturnLogicalSlotTriggersExternalReleaseOnLastSlot() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final AtomicBoolean externalReleaseInitiated = new AtomicBoolean(false);
+ final SharedSlot sharedSlot =
+ new SharedSlot(
+ new SlotRequestId(),
+ physicalSlot,
+ false,
+ () -> externalReleaseInitiated.set(true));
+ final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+ final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+ // this implicitly returns the slot
+ logicalSlot1.releaseSlot(new Exception("test"));
+ assertThat(externalReleaseInitiated.get(), is(false));
+
+ logicalSlot2.releaseSlot(new Exception("test"));
+ assertThat(externalReleaseInitiated.get(), is(true));
+ }
+
+ @Test
+ public void testReleaseTriggersExternalRelease() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final AtomicBoolean externalReleaseInitiated = new AtomicBoolean(false);
+ final SharedSlot sharedSlot =
+ new SharedSlot(
+ new SlotRequestId(),
+ physicalSlot,
+ false,
+ () -> externalReleaseInitiated.set(true));
+
+ sharedSlot.release(new Exception("test"));
+
+ assertThat(externalReleaseInitiated.get(), is(true));
+ }
+
+ @Test
+ public void testReleaseAlsoReleasesLogicalSlots() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+ final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+ sharedSlot.release(new Exception("test"));
+
+ assertThat(logicalSlot.isAlive(), is(false));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReleaseForbidsSubsequentLogicalSlotAllocations() {
+ final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+ sharedSlot.release(new Exception("test"));
+
+ sharedSlot.allocateLogicalSlot();
+ }
+
+ private static class TestPhysicalSlotPayload implements PhysicalSlot.Payload {
+
+ @Override
+ public void release(Throwable cause) {}
+
+ @Override
+ public boolean willOccupySlotIndefinitely() {
+ return false;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
new file mode 100644
index 0000000..e505de4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SlotSharingSlotAllocator}. */
+public class SlotSharingSlotAllocatorTest extends TestLogger {
+
+ private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) -> {};
+ private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION =
+ (allocationId, resourceProfile) ->
+ TestingPhysicalSlot.builder()
+ .withAllocationID(allocationId)
+ .withResourceProfile(resourceProfile)
+ .build();
+
+ private static final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+ private static final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
+ private static final JobInformation.VertexInformation vertex1 =
+ new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
+ private static final JobInformation.VertexInformation vertex2 =
+ new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
+ private static final JobInformation.VertexInformation vertex3 =
+ new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);
+
+ @Test
+ public void testCalculateRequiredSlots() {
+ final SlotSharingSlotAllocator slotAllocator =
+ new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+ final ResourceCounter resourceCounter =
+ slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3));
+
+ assertThat(resourceCounter.getResources(), contains(ResourceProfile.UNKNOWN));
+ assertThat(
+ resourceCounter.getResourceCount(ResourceProfile.UNKNOWN),
+ is(
+ Math.max(vertex1.getParallelism(), vertex2.getParallelism())
+ + vertex3.getParallelism()));
+ }
+
+ @Test
+ public void testDetermineParallelismWithMinimumSlots() {
+ final SlotSharingSlotAllocator slotAllocator =
+ new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+ final JobInformation jobInformation =
+ new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+ final VertexParallelism slotSharingAssignments =
+ slotAllocator.determineParallelism(jobInformation, getSlots(2)).get();
+
+ final Map<JobVertexID, Integer> maxParallelismForVertices =
+ slotSharingAssignments.getMaxParallelismForVertices();
+
+ assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), is(1));
+ assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), is(1));
+ assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), is(1));
+ }
+
+ @Test
+ public void testDetermineParallelismWithManySlots() {
+ final SlotSharingSlotAllocator slotAllocator =
+ new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+ final JobInformation jobInformation =
+ new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+ final VertexParallelism slotSharingAssignments =
+ slotAllocator.determineParallelism(jobInformation, getSlots(50)).get();
+
+ final Map<JobVertexID, Integer> maxParallelismForVertices =
+ slotSharingAssignments.getMaxParallelismForVertices();
+
+ assertThat(
+ maxParallelismForVertices.get(vertex1.getJobVertexID()),
+ is(vertex1.getParallelism()));
+ assertThat(
+ maxParallelismForVertices.get(vertex2.getJobVertexID()),
+ is(vertex2.getParallelism()));
+ assertThat(
+ maxParallelismForVertices.get(vertex3.getJobVertexID()),
+ is(vertex3.getParallelism()));
+ }
+
+ @Test
+ public void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
+ final SlotSharingSlotAllocator slotAllocator =
+ new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+ final JobInformation jobInformation =
+ new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+ final Optional<VertexParallelismWithSlotSharing> slotSharingAssignments =
+ slotAllocator.determineParallelism(jobInformation, getSlots(1));
+
+ assertThat(slotSharingAssignments.isPresent(), is(false));
+ }
+
+ @Test
+ public void testReserveResources() {
+ final SlotSharingSlotAllocator slotAllocator =
+ new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+ final JobInformation jobInformation =
+ new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+ final VertexParallelismWithSlotSharing slotAssignments =
+ slotAllocator.determineParallelism(jobInformation, getSlots(50)).get();
+
+ final Map<ExecutionVertexID, LogicalSlot> assignedResources =
+ slotAllocator.reserveResources(slotAssignments);
+
+ final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new HashMap<>();
+ for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot assignment :
+ slotAssignments.getAssignments()) {
+ for (ExecutionVertexID containedExecutionVertex :
+ assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
+ expectedAssignments.put(containedExecutionVertex, assignment.getSlotInfo());
+ }
+ }
+
+ for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment :
+ expectedAssignments.entrySet()) {
+ final LogicalSlot assignedSlot = assignedResources.get(expectedAssignment.getKey());
+
+ final SlotInfo backingSlot = expectedAssignment.getValue();
+
+ assertThat(assignedSlot.getAllocationId(), is(backingSlot.getAllocationId()));
+ }
+ }
+
+ private static Collection<SlotInfo> getSlots(int count) {
+ final Collection<SlotInfo> slotInfo = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ slotInfo.add(new TestSlotInfo());
+ }
+ return slotInfo;
+ }
+
+ private static class TestJobInformation implements JobInformation {
+
+ private final Map<JobVertexID, VertexInformation> vertexIdToInformation;
+ private final Collection<SlotSharingGroup> slotSharingGroups;
+
+ private TestJobInformation(Collection<VertexInformation> vertexIdToInformation) {
+ this.vertexIdToInformation =
+ vertexIdToInformation.stream()
+ .collect(
+ Collectors.toMap(
+ VertexInformation::getJobVertexID,
+ Function.identity()));
+ this.slotSharingGroups =
+ vertexIdToInformation.stream()
+ .map(VertexInformation::getSlotSharingGroup)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Collection<SlotSharingGroup> getSlotSharingGroups() {
+ return slotSharingGroups;
+ }
+
+ @Override
+ public VertexInformation getVertexInformation(JobVertexID jobVertexId) {
+ return vertexIdToInformation.get(jobVertexId);
+ }
+ }
+
+ private static class TestVertexInformation implements JobInformation.VertexInformation {
+
+ private final JobVertexID jobVertexId;
+ private final int parallelism;
+ private final SlotSharingGroup slotSharingGroup;
+
+ private TestVertexInformation(
+ JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) {
+ this.jobVertexId = jobVertexId;
+ this.parallelism = parallelism;
+ this.slotSharingGroup = slotSharingGroup;
+ slotSharingGroup.addVertexToGroup(jobVertexId, ResourceSpec.UNKNOWN);
+ }
+
+ @Override
+ public JobVertexID getJobVertexID() {
+ return jobVertexId;
+ }
+
+ @Override
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public SlotSharingGroup getSlotSharingGroup() {
+ return slotSharingGroup;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/TestSlotInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/TestSlotInfo.java
new file mode 100644
index 0000000..e56be3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/TestSlotInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/** Test {@link SlotInfo} implementation. */
+class TestSlotInfo implements SlotInfo {
+
+ private final AllocationID allocationId = new AllocationID();
+
+ @Override
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ @Override
+ public TaskManagerLocation getTaskManagerLocation() {
+ return new LocalTaskManagerLocation();
+ }
+
+ @Override
+ public int getPhysicalSlotNumber() {
+ return 0;
+ }
+
+ @Override
+ public ResourceProfile getResourceProfile() {
+ return ResourceProfile.ANY;
+ }
+
+ @Override
+ public boolean willBeOccupiedIndefinitely() {
+ return false;
+ }
+}