You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/08 00:38:01 UTC

[flink] 03/03: [FLINK-21098][coordination] Add SlotAllocator

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1f099c3c022e9274171ddb1a8499b6b111c1656
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 28 11:30:37 2021 +0100

    [FLINK-21098][coordination] Add SlotAllocator
---
 .../declarative/allocator/FreeSlotFunction.java    |  38 ++++
 .../declarative/allocator/JobInformation.java      |  39 ++++
 .../declarative/allocator/ReserveSlotFunction.java |  35 +++
 .../declarative/allocator/SharedSlot.java          | 141 ++++++++++++
 .../declarative/allocator/SlotAllocator.java       |  71 ++++++
 .../allocator/SlotSharingSlotAllocator.java        | 215 ++++++++++++++++++
 .../declarative/allocator/VertexParallelism.java   |  34 +++
 .../VertexParallelismWithSlotSharing.java          |  47 ++++
 .../declarative/allocator/SharedSlotTest.java      | 180 ++++++++++++++++
 .../allocator/SlotSharingSlotAllocatorTest.java    | 239 +++++++++++++++++++++
 .../declarative/allocator/TestSlotInfo.java        |  55 +++++
 11 files changed, 1094 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java
new file mode 100644
index 0000000..a92ad6f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import javax.annotation.Nullable;
+
+/** A function for freeing slots. */
+@FunctionalInterface
+public interface FreeSlotFunction {
+    /**
+     * Frees the slot identified by the given {@link AllocationID}.
+     *
+     * <p>If the slot is freed due to exceptional circumstances a {@link Throwable} cause should be
+     * provided.
+     *
+     * @param allocationId identifies the slot
+     * @param cause reason for why the slot was freed; null during normal operations
+     * @param timestamp when the slot was freed
+     */
+    void freeSlot(AllocationID allocationId, @Nullable Throwable cause, long timestamp);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
new file mode 100644
index 0000000..f9c9015
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/JobInformation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import java.util.Collection;
+
+/** Information about the job. */
+public interface JobInformation {
+    Collection<SlotSharingGroup> getSlotSharingGroups();
+
+    VertexInformation getVertexInformation(JobVertexID jobVertexId);
+
+    /** Information about a single vertex. */
+    interface VertexInformation {
+        JobVertexID getJobVertexID();
+
+        int getParallelism();
+
+        SlotSharingGroup getSlotSharingGroup();
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/ReserveSlotFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/ReserveSlotFunction.java
new file mode 100644
index 0000000..3a26ee4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/ReserveSlotFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+
+/** A function for reserving slots. */
+@FunctionalInterface
+public interface ReserveSlotFunction {
+    /**
+     * Reserves the slot identified by the given allocation ID for the given resource profile.
+     *
+     * @param allocationId identifies the slot
+     * @param resourceProfile resource profile the slot must be able to fulfill
+     * @return reserved slot
+     */
+    PhysicalSlot reserveSlot(AllocationID allocationId, ResourceProfile resourceProfile);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
new file mode 100644
index 0000000..90cce74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Shared slot implementation for the declarative scheduler. */
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+    private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
+
+    private final SlotRequestId physicalSlotRequestId;
+
+    private final PhysicalSlot physicalSlot;
+
+    private final Runnable externalReleaseCallback;
+
+    private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
+
+    private final boolean slotWillBeOccupiedIndefinitely;
+
+    private State state;
+
+    public SharedSlot(
+            SlotRequestId physicalSlotRequestId,
+            PhysicalSlot physicalSlot,
+            boolean slotWillBeOccupiedIndefinitely,
+            Runnable externalReleaseCallback) {
+        this.physicalSlotRequestId = physicalSlotRequestId;
+        this.physicalSlot = physicalSlot;
+        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+        this.externalReleaseCallback = externalReleaseCallback;
+        this.allocatedLogicalSlots = new HashMap<>();
+
+        Preconditions.checkState(
+                physicalSlot.tryAssignPayload(this),
+                "The provided slot (%s) was not free.",
+                physicalSlot.getAllocationId());
+        this.state = State.ALLOCATED;
+    }
+
+    /**
+     * Registers an allocation request for a logical slot.
+     *
+     * @return the logical slot
+     */
+    public LogicalSlot allocateLogicalSlot() {
+        LOG.debug("Allocating logical slot from shared slot ({})", physicalSlotRequestId);
+        Preconditions.checkState(
+                state == State.ALLOCATED, "The shared slot has already been released.");
+
+        final LogicalSlot slot =
+                new SingleLogicalSlot(
+                        new SlotRequestId(),
+                        physicalSlot,
+                        Locality.UNKNOWN,
+                        this,
+                        slotWillBeOccupiedIndefinitely);
+
+        allocatedLogicalSlots.put(slot.getSlotRequestId(), slot);
+        return slot;
+    }
+
+    @Override
+    public void returnLogicalSlot(LogicalSlot logicalSlot) {
+        LOG.debug("Returning logical slot to shared slot ({})", physicalSlotRequestId);
+        Preconditions.checkState(
+                state == State.ALLOCATED, "The shared slot has already been released.");
+
+        Preconditions.checkState(!logicalSlot.isAlive(), "Returned logic slot must not be alive.");
+        Preconditions.checkState(
+                allocatedLogicalSlots.remove(logicalSlot.getSlotRequestId()) != null,
+                "Trying to remove a logical slot request which has been either already removed or never created.");
+        tryReleaseExternally();
+    }
+
+    @Override
+    public void release(Throwable cause) {
+        LOG.debug("Release shared slot ({})", physicalSlotRequestId);
+        Preconditions.checkState(
+                state == State.ALLOCATED, "The shared slot has already been released.");
+
+        // copy the logical slot collection to avoid ConcurrentModificationException
+        // if logical slot releases cause cancellation of other executions
+        // which will try to call returnLogicalSlot and modify allocatedLogicalSlots collection
+        final List<LogicalSlot> logicalSlotsToRelease =
+                new ArrayList<>(allocatedLogicalSlots.values());
+        for (LogicalSlot allocatedLogicalSlot : logicalSlotsToRelease) {
+            allocatedLogicalSlot.releaseSlot(cause);
+        }
+        allocatedLogicalSlots.clear();
+        tryReleaseExternally();
+    }
+
+    private void tryReleaseExternally() {
+        if (state != State.RELEASED && allocatedLogicalSlots.isEmpty()) {
+            state = State.RELEASED;
+            LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
+            externalReleaseCallback.run();
+        }
+    }
+
+    @Override
+    public boolean willOccupySlotIndefinitely() {
+        return slotWillBeOccupiedIndefinitely;
+    }
+
+    private enum State {
+        ALLOCATED,
+        RELEASED
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java
new file mode 100644
index 0000000..3eb9eca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+/** Component for calculating the slot requirements and mapping of vertices to slots. */
+public interface SlotAllocator<T extends VertexParallelism> {
+
+    /**
+     * Calculates the total resources required for scheduling the given vertices.
+     *
+     * @param vertices vertices to schedule
+     * @return required resources
+     */
+    ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);
+
+    /**
+     * Determines the parallelism at which the vertices could be scheduled given the collection of
+     * slots. This method may be called with any number of slots providing any amount of resources,
+     * irrespective of what {@link #calculateRequiredSlots(Iterable)} returned.
+     *
+     * <p>If a {@link VertexParallelism} is returned then it covers all vertices contained in the
+     * given job information.
+     *
+     * <p>A returned {@link VertexParallelism} should be directly consumed afterwards (by either
+     * discarding it or calling {@link #reserveResources(VertexParallelism)}, as there is no
+     * guarantee that the assignment remains valid over time (because slots can be lost).
+     *
+     * <p>Implementations of this method must be side-effect free. There is no guarantee that the
+     * result of this method is ever passed to {@link #reserveResources(VertexParallelism)}.
+     *
+     * @param jobInformation information about the job graph
+     * @param slots slots to consider for determining the parallelism
+     * @return potential parallelism for all vertices and implementation-specific information for
+     *     how the vertices could be assigned to slots, if all vertices could be run with the given
+     *     slots
+     */
+    Optional<T> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> slots);
+
+    /**
+     * Reserves slots according to the given assignment.
+     *
+     * @param vertexParallelism information on how slots should be assigned to the slots
+     * @return mapping of vertices to slots
+     */
+    Map<ExecutionVertexID, LogicalSlot> reserveResources(T vertexParallelism);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
new file mode 100644
index 0000000..e422fd5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements SlotAllocator<VertexParallelismWithSlotSharing> {
+
+    private final ReserveSlotFunction reserveSlotFunction;
+    private final FreeSlotFunction freeSlotFunction;
+
+    public SlotSharingSlotAllocator(
+            ReserveSlotFunction reserveSlot, FreeSlotFunction freeSlotFunction) {
+        this.reserveSlotFunction = reserveSlot;
+        this.freeSlotFunction = freeSlotFunction;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        int numTotalRequiredSlots = 0;
+        for (Integer requiredSlots : getMaxParallelismForSlotSharingGroups(vertices).values()) {
+            numTotalRequiredSlots += requiredSlots;
+        }
+        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, numTotalRequiredSlots);
+    }
+
+    private static Map<SlotSharingGroupId, Integer> getMaxParallelismForSlotSharingGroups(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        final Map<SlotSharingGroupId, Integer> maxParallelismForSlotSharingGroups = new HashMap<>();
+        for (JobInformation.VertexInformation vertex : vertices) {
+            maxParallelismForSlotSharingGroups.compute(
+                    vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+                    (slotSharingGroupId, currentMaxParallelism) ->
+                            currentMaxParallelism == null
+                                    ? vertex.getParallelism()
+                                    : Math.max(currentMaxParallelism, vertex.getParallelism()));
+        }
+        return maxParallelismForSlotSharingGroups;
+    }
+
+    @Override
+    public Optional<VertexParallelismWithSlotSharing> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) {
+        // TODO: This can waste slots if the max parallelism for slot sharing groups is not equal
+        final int slotsPerSlotSharingGroup =
+                freeSlots.size() / jobInformation.getSlotSharingGroups().size();
+
+        if (slotsPerSlotSharingGroup == 0) {
+            // => less slots than slot-sharing groups
+            return Optional.empty();
+        }
+
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+
+        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new ArrayList<>();
+        final Map<JobVertexID, Integer> allVertexParallelism = new HashMap<>();
+
+        for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
+            final List<JobInformation.VertexInformation> containedJobVertices =
+                    slotSharingGroup.getJobVertexIds().stream()
+                            .map(jobInformation::getVertexInformation)
+                            .collect(Collectors.toList());
+
+            final Map<JobVertexID, Integer> vertexParallelism =
+                    determineParallelism(containedJobVertices, slotsPerSlotSharingGroup);
+
+            final Iterable<ExecutionSlotSharingGroup> sharedSlotToVertexAssignment =
+                    createExecutionSlotSharingGroups(vertexParallelism);
+
+            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+                    sharedSlotToVertexAssignment) {
+                final SlotInfo slotInfo = slotIterator.next();
+
+                assignments.add(
+                        new ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+            }
+            allVertexParallelism.putAll(vertexParallelism);
+        }
+
+        return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
+    }
+
+    private static Map<JobVertexID, Integer> determineParallelism(
+            Collection<JobInformation.VertexInformation> containedJobVertices, int availableSlots) {
+        final Map<JobVertexID, Integer> vertexParallelism = new HashMap<>();
+        for (JobInformation.VertexInformation jobVertex : containedJobVertices) {
+            final int parallelism = Math.min(jobVertex.getParallelism(), availableSlots);
+
+            vertexParallelism.put(jobVertex.getJobVertexID(), parallelism);
+        }
+
+        return vertexParallelism;
+    }
+
+    private static Iterable<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+            Map<JobVertexID, Integer> containedJobVertices) {
+        final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>();
+
+        for (Map.Entry<JobVertexID, Integer> jobVertex : containedJobVertices.entrySet()) {
+            for (int i = 0; i < jobVertex.getValue(); i++) {
+                sharedSlotToVertexAssignment
+                        .computeIfAbsent(i, ignored -> new HashSet<>())
+                        .add(new ExecutionVertexID(jobVertex.getKey(), i));
+            }
+        }
+
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+            VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) {
+        final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new HashMap<>();
+
+        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
+                vertexParallelismWithSlotSharing.getAssignments()) {
+            final SharedSlot sharedSlot =
+                    reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+
+            for (ExecutionVertexID executionVertexId :
+                    executionSlotSharingGroup
+                            .getExecutionSlotSharingGroup()
+                            .getContainedExecutionVertices()) {
+                final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+                assignedSlots.put(executionVertexId, logicalSlot);
+            }
+        }
+
+        return assignedSlots;
+    }
+
+    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
+        final PhysicalSlot physicalSlot =
+                reserveSlotFunction.reserveSlot(
+                        slotInfo.getAllocationId(), ResourceProfile.UNKNOWN);
+
+        return new SharedSlot(
+                new SlotRequestId(),
+                physicalSlot,
+                slotInfo.willBeOccupiedIndefinitely(),
+                () ->
+                        freeSlotFunction.freeSlot(
+                                slotInfo.getAllocationId(), null, System.currentTimeMillis()));
+    }
+
+    static class ExecutionSlotSharingGroup {
+        private final Set<ExecutionVertexID> containedExecutionVertices;
+
+        public ExecutionSlotSharingGroup(Set<ExecutionVertexID> containedExecutionVertices) {
+            this.containedExecutionVertices = containedExecutionVertices;
+        }
+
+        public Collection<ExecutionVertexID> getContainedExecutionVertices() {
+            return containedExecutionVertices;
+        }
+    }
+
+    static class ExecutionSlotSharingGroupAndSlot {
+        private final ExecutionSlotSharingGroup executionSlotSharingGroup;
+        private final SlotInfo slotInfo;
+
+        public ExecutionSlotSharingGroupAndSlot(
+                ExecutionSlotSharingGroup executionSlotSharingGroup, SlotInfo slotInfo) {
+            this.executionSlotSharingGroup = executionSlotSharingGroup;
+            this.slotInfo = slotInfo;
+        }
+
+        public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
+            return executionSlotSharingGroup;
+        }
+
+        public SlotInfo getSlotInfo() {
+            return slotInfo;
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelism.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelism.java
new file mode 100644
index 0000000..39a887d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelism.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, Collection)}, describing
+ * the parallelism each vertex could be scheduled with.
+ *
+ * <p>{@link SlotAllocator} implementations may encode additional information to be used in {@link
+ * SlotAllocator#reserveResources(VertexParallelism)}.
+ */
+public interface VertexParallelism {
+    Map<JobVertexID, Integer> getMaxParallelismForVertices();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelismWithSlotSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelismWithSlotSharing.java
new file mode 100644
index 0000000..84b4d59
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/VertexParallelismWithSlotSharing.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Map;
+
+/** {@link VertexParallelism} implementation for the {@link SlotSharingSlotAllocator}. */
+public class VertexParallelismWithSlotSharing implements VertexParallelism {
+
+    private final Map<JobVertexID, Integer> vertexParallelism;
+    private final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> assignments;
+
+    VertexParallelismWithSlotSharing(
+            Map<JobVertexID, Integer> vertexParallelism,
+            Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> assignments) {
+        this.vertexParallelism = vertexParallelism;
+        this.assignments = Preconditions.checkNotNull(assignments);
+    }
+
+    Iterable<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> getAssignments() {
+        return assignments;
+    }
+
+    @Override
+    public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
+        return vertexParallelism;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
new file mode 100644
index 0000000..97dc1f4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+    @Test
+    public void testConstructorAssignsPayload() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        assertThat(physicalSlot.getPayload(), not(nullValue()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+    }
+
+    @Test
+    public void testAllocateLogicalSlot() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot.getAllocationId(), equalTo(physicalSlot.getAllocationId()));
+        assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+        assertThat(logicalSlot.getPayload(), nullValue());
+        assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlot.getPhysicalSlotNumber()));
+        assertThat(
+                logicalSlot.getTaskManagerLocation(),
+                equalTo(physicalSlot.getTaskManagerLocation()));
+        assertThat(
+                logicalSlot.getTaskManagerGateway(), equalTo(physicalSlot.getTaskManagerGateway()));
+    }
+
+    @Test
+    public void testAllocateLogicalSlotIssuesUniqueSlotRequestIds() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot1.getSlotRequestId(), not(equalTo(logicalSlot2.getSlotRequestId())));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testReturnLogicalSlotRejectsAliveSlots() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        sharedSlot.returnLogicalSlot(logicalSlot);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testReturnLogicalSlotRejectsUnknownSlot() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+        final LogicalSlot logicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
+        logicalSlot.releaseSlot(new Exception("test"));
+
+        sharedSlot.returnLogicalSlot(logicalSlot);
+    }
+
+    @Test
+    public void testReturnLogicalSlotTriggersExternalReleaseOnLastSlot() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final AtomicBoolean externalReleaseInitiated = new AtomicBoolean(false);
+        final SharedSlot sharedSlot =
+                new SharedSlot(
+                        new SlotRequestId(),
+                        physicalSlot,
+                        false,
+                        () -> externalReleaseInitiated.set(true));
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        // this implicitly returns the slot
+        logicalSlot1.releaseSlot(new Exception("test"));
+        assertThat(externalReleaseInitiated.get(), is(false));
+
+        logicalSlot2.releaseSlot(new Exception("test"));
+        assertThat(externalReleaseInitiated.get(), is(true));
+    }
+
+    @Test
+    public void testReleaseTriggersExternalRelease() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final AtomicBoolean externalReleaseInitiated = new AtomicBoolean(false);
+        final SharedSlot sharedSlot =
+                new SharedSlot(
+                        new SlotRequestId(),
+                        physicalSlot,
+                        false,
+                        () -> externalReleaseInitiated.set(true));
+
+        sharedSlot.release(new Exception("test"));
+
+        assertThat(externalReleaseInitiated.get(), is(true));
+    }
+
+    @Test
+    public void testReleaseAlsoReleasesLogicalSlots() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        sharedSlot.release(new Exception("test"));
+
+        assertThat(logicalSlot.isAlive(), is(false));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testReleaseForbidsSubsequentLogicalSlotAllocations() {
+        final TestingPhysicalSlot physicalSlot = TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        sharedSlot.release(new Exception("test"));
+
+        sharedSlot.allocateLogicalSlot();
+    }
+
+    private static class TestPhysicalSlotPayload implements PhysicalSlot.Payload {
+
+        @Override
+        public void release(Throwable cause) {}
+
+        @Override
+        public boolean willOccupySlotIndefinitely() {
+            return false;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
new file mode 100644
index 0000000..e505de4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SlotSharingSlotAllocator}. */
+public class SlotSharingSlotAllocatorTest extends TestLogger {
+
+    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) -> {};
+    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION =
+            (allocationId, resourceProfile) ->
+                    TestingPhysicalSlot.builder()
+                            .withAllocationID(allocationId)
+                            .withResourceProfile(resourceProfile)
+                            .build();
+
+    private static final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+    private static final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
+    private static final JobInformation.VertexInformation vertex1 =
+            new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex2 =
+            new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex3 =
+            new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);
+
+    @Test
+    public void testCalculateRequiredSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+        final ResourceCounter resourceCounter =
+                slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3));
+
+        assertThat(resourceCounter.getResources(), contains(ResourceProfile.UNKNOWN));
+        assertThat(
+                resourceCounter.getResourceCount(ResourceProfile.UNKNOWN),
+                is(
+                        Math.max(vertex1.getParallelism(), vertex2.getParallelism())
+                                + vertex3.getParallelism()));
+    }
+
+    @Test
+    public void testDetermineParallelismWithMinimumSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, getSlots(2)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), is(1));
+        assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), is(1));
+        assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), is(1));
+    }
+
+    @Test
+    public void testDetermineParallelismWithManySlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, getSlots(50)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(
+                maxParallelismForVertices.get(vertex1.getJobVertexID()),
+                is(vertex1.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex2.getJobVertexID()),
+                is(vertex2.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex3.getJobVertexID()),
+                is(vertex3.getParallelism()));
+    }
+
+    @Test
+    public void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+        final Optional<VertexParallelismWithSlotSharing> slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, getSlots(1));
+
+        assertThat(slotSharingAssignments.isPresent(), is(false));
+    }
+
+    @Test
+    public void testReserveResources() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
+
+        final VertexParallelismWithSlotSharing slotAssignments =
+                slotAllocator.determineParallelism(jobInformation, getSlots(50)).get();
+
+        final Map<ExecutionVertexID, LogicalSlot> assignedResources =
+                slotAllocator.reserveResources(slotAssignments);
+
+        final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new HashMap<>();
+        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot assignment :
+                slotAssignments.getAssignments()) {
+            for (ExecutionVertexID containedExecutionVertex :
+                    assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
+                expectedAssignments.put(containedExecutionVertex, assignment.getSlotInfo());
+            }
+        }
+
+        for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment :
+                expectedAssignments.entrySet()) {
+            final LogicalSlot assignedSlot = assignedResources.get(expectedAssignment.getKey());
+
+            final SlotInfo backingSlot = expectedAssignment.getValue();
+
+            assertThat(assignedSlot.getAllocationId(), is(backingSlot.getAllocationId()));
+        }
+    }
+
+    private static Collection<SlotInfo> getSlots(int count) {
+        final Collection<SlotInfo> slotInfo = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            slotInfo.add(new TestSlotInfo());
+        }
+        return slotInfo;
+    }
+
+    private static class TestJobInformation implements JobInformation {
+
+        private final Map<JobVertexID, VertexInformation> vertexIdToInformation;
+        private final Collection<SlotSharingGroup> slotSharingGroups;
+
+        private TestJobInformation(Collection<VertexInformation> vertexIdToInformation) {
+            this.vertexIdToInformation =
+                    vertexIdToInformation.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            VertexInformation::getJobVertexID,
+                                            Function.identity()));
+            this.slotSharingGroups =
+                    vertexIdToInformation.stream()
+                            .map(VertexInformation::getSlotSharingGroup)
+                            .collect(Collectors.toSet());
+        }
+
+        @Override
+        public Collection<SlotSharingGroup> getSlotSharingGroups() {
+            return slotSharingGroups;
+        }
+
+        @Override
+        public VertexInformation getVertexInformation(JobVertexID jobVertexId) {
+            return vertexIdToInformation.get(jobVertexId);
+        }
+    }
+
+    private static class TestVertexInformation implements JobInformation.VertexInformation {
+
+        private final JobVertexID jobVertexId;
+        private final int parallelism;
+        private final SlotSharingGroup slotSharingGroup;
+
+        private TestVertexInformation(
+                JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) {
+            this.jobVertexId = jobVertexId;
+            this.parallelism = parallelism;
+            this.slotSharingGroup = slotSharingGroup;
+            slotSharingGroup.addVertexToGroup(jobVertexId, ResourceSpec.UNKNOWN);
+        }
+
+        @Override
+        public JobVertexID getJobVertexID() {
+            return jobVertexId;
+        }
+
+        @Override
+        public int getParallelism() {
+            return parallelism;
+        }
+
+        @Override
+        public SlotSharingGroup getSlotSharingGroup() {
+            return slotSharingGroup;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/TestSlotInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/TestSlotInfo.java
new file mode 100644
index 0000000..e56be3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/TestSlotInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/** Test {@link SlotInfo} implementation. */
+class TestSlotInfo implements SlotInfo {
+
+    private final AllocationID allocationId = new AllocationID();
+
+    @Override
+    public AllocationID getAllocationId() {
+        return allocationId;
+    }
+
+    @Override
+    public TaskManagerLocation getTaskManagerLocation() {
+        return new LocalTaskManagerLocation();
+    }
+
+    @Override
+    public int getPhysicalSlotNumber() {
+        return 0;
+    }
+
+    @Override
+    public ResourceProfile getResourceProfile() {
+        return ResourceProfile.ANY;
+    }
+
+    @Override
+    public boolean willBeOccupiedIndefinitely() {
+        return false;
+    }
+}