You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/11 14:25:26 UTC

[flink] 02/03: [FLINK-17016][runtime] Use SlotSharingExecutionSlotAllocator for pipelined region scheduling

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d918cc3e660d9cdbb84a175c8bb93ce6a14e38dc
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Aug 31 17:26:24 2020 +0800

    [FLINK-17016][runtime] Use SlotSharingExecutionSlotAllocator for pipelined region scheduling
---
 .../apache/flink/runtime/jobgraph/JobGraph.java    | 26 ++++++
 .../slotpool/PhysicalSlotProviderImpl.java         |  7 +-
 .../DefaultExecutionSlotAllocatorFactory.java      |  4 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  | 10 ++-
 .../runtime/scheduler/DefaultSchedulerFactory.java | 11 +--
 .../scheduler/ExecutionSlotAllocationContext.java  | 95 ++++++++++++++++++++++
 .../scheduler/ExecutionSlotAllocatorFactory.java   | 10 ++-
 .../OneSlotPerExecutionSlotAllocatorFactory.java   |  4 +-
 .../SlotSharingExecutionSlotAllocatorFactory.java  | 29 ++-----
 .../flink/runtime/jobgraph/JobGraphTest.java       | 45 ++++++++++
 .../TestExecutionSlotAllocatorFactory.java         |  2 +-
 11 files changed, 207 insertions(+), 36 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 95a9a28..ffa1502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -26,7 +26,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -41,7 +44,9 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -300,6 +305,27 @@ public class JobGraph implements Serializable {
 		return this.taskVertices.size();
 	}
 
+	public Set<SlotSharingGroup> getSlotSharingGroups() {
+		final Set<SlotSharingGroup> slotSharingGroups = IterableUtils
+			.toStream(getVertices())
+			.map(JobVertex::getSlotSharingGroup)
+			.collect(Collectors.toSet());
+		return Collections.unmodifiableSet(slotSharingGroups);
+	}
+
+	public Set<CoLocationGroupDesc> getCoLocationGroupDescriptors() {
+		// invoke distinct() on CoLocationGroup first to avoid creating
+		// multiple CoLocationGroupDec from one CoLocationGroup
+		final Set<CoLocationGroupDesc> coLocationGroups = IterableUtils
+			.toStream(getVertices())
+			.map(JobVertex::getCoLocationGroup)
+			.filter(Objects::nonNull)
+			.distinct()
+			.map(CoLocationGroupDesc::from)
+			.collect(Collectors.toSet());
+		return Collections.unmodifiableSet(coLocationGroups);
+	}
+
 	/**
 	 * Sets the settings for asynchronous snapshots. A value of {@code null} means that
 	 * snapshotting is not enabled.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 023405b..4841a79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -32,14 +32,17 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
+/**
+ * The provider serves physical slot requests.
+ */
+public class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
 	private static final Logger LOG = LoggerFactory.getLogger(PhysicalSlotProviderImpl.class);
 
 	private final SlotSelectionStrategy slotSelectionStrategy;
 
 	private final SlotPool slotPool;
 
-	PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
+	public PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
 		this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
 		this.slotPool = checkNotNull(slotPool);
 		slotPool.disableBatchSlotRequestTimeoutCheck();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
index 4324c3b..824e4cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
@@ -35,7 +35,7 @@ class DefaultExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFact
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
-		return new DefaultExecutionSlotAllocator(slotProvider, preferredLocationsRetriever);
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
+		return new DefaultExecutionSlotAllocator(slotProvider, context::getPreferredLocations);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index ac660ce..db748bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -160,7 +160,15 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			failoverStrategy,
 			restartBackoffTimeStrategy);
 		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
-		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(getPreferredLocationsRetriever());
+
+		final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
+			getPreferredLocationsRetriever(),
+			executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
+			executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
+			getSchedulingTopology(),
+			() -> getJobGraph().getSlotSharingGroups(),
+			() -> getJobGraph().getCoLocationGroupDescriptors());
+		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
 
 		this.verticesWaitingForRestart = new HashSet<>();
 		this.startUpAction = startUpAction;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 581e075..84f3143 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -35,9 +35,9 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
@@ -164,10 +164,11 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
 		final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
 			.createFromSlotPool(slotPool, SystemClock.getInstance());
-		final BulkSlotProvider bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool, bulkChecker);
-		final ExecutionSlotAllocatorFactory allocatorFactory = new OneSlotPerExecutionSlotAllocatorFactory(
-			bulkSlotProvider,
+		final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+		final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
+			physicalSlotProvider,
 			scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+			bulkChecker,
 			slotRequestTimeout);
 		return new DefaultSchedulerComponents(
 			new PipelinedRegionSchedulingStrategy.Factory(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
new file mode 100644
index 0000000..8180c1f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Context for slot allocation.
+ */
+class ExecutionSlotAllocationContext implements PreferredLocationsRetriever {
+
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
+
+	private final SchedulingTopology schedulingTopology;
+
+	private final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier;
+
+	private final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier;
+
+	ExecutionSlotAllocationContext(
+			final PreferredLocationsRetriever preferredLocationsRetriever,
+			final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
+			final SchedulingTopology schedulingTopology,
+			final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier,
+			final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier) {
+
+		this.preferredLocationsRetriever = preferredLocationsRetriever;
+		this.resourceProfileRetriever = resourceProfileRetriever;
+		this.priorAllocationIdRetriever = priorAllocationIdRetriever;
+		this.schedulingTopology = schedulingTopology;
+		this.logicalSlotSharingGroupSupplier = logicalSlotSharingGroupSupplier;
+		this.coLocationGroupSupplier = coLocationGroupSupplier;
+	}
+
+	@Override
+	public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
+			final ExecutionVertexID executionVertexId,
+			final Set<ExecutionVertexID> producersToIgnore) {
+		return preferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
+	}
+
+	ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
+		return resourceProfileRetriever.apply(executionVertexId);
+	}
+
+	AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
+		return priorAllocationIdRetriever.apply(executionVertexId);
+	}
+
+	SchedulingTopology getSchedulingTopology() {
+		return schedulingTopology;
+	}
+
+	Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
+		return logicalSlotSharingGroupSupplier.get();
+	}
+
+	Set<CoLocationGroupDesc> getCoLocationGroups() {
+		return coLocationGroupSupplier.get();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
index 1dc82aa..7a1586b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
@@ -22,8 +22,14 @@ package org.apache.flink.runtime.scheduler;
 /**
  * Interface for {@link ExecutionSlotAllocator} factories.
  */
+@FunctionalInterface
 public interface ExecutionSlotAllocatorFactory {
 
-	ExecutionSlotAllocator createInstance(PreferredLocationsRetriever preferredLocationsRetriever);
-
+	/**
+	 * Instantiates the {@link ExecutionSlotAllocator}.
+	 *
+	 * @param context for slot allocation
+	 * @return The instantiated slot allocator
+	 */
+	ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
index 1824daf..b8a7f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -45,10 +45,10 @@ class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorF
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
 		return new OneSlotPerExecutionSlotAllocator(
 			slotProvider,
-			preferredLocationsRetriever,
+			context::getPreferredLocations,
 			slotWillBeOccupiedIndefinitely,
 			allocationTimeout);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
index 8842c7c..392ee67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
@@ -19,14 +19,9 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
 import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import java.util.function.Function;
 
 /**
  * Factory for {@link SlotSharingExecutionSlotAllocator}.
@@ -36,12 +31,6 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 
 	private final boolean slotWillBeOccupiedIndefinitely;
 
-	private final SlotSharingStrategy slotSharingStrategy;
-
-	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
-	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
-
 	private final PhysicalSlotRequestBulkChecker bulkChecker;
 
 	private final Time allocationTimeout;
@@ -49,25 +38,23 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 	SlotSharingExecutionSlotAllocatorFactory(
 			PhysicalSlotProvider slotProvider,
 			boolean slotWillBeOccupiedIndefinitely,
-			SlotSharingStrategy slotSharingStrategy,
-			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
-			Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
 			PhysicalSlotRequestBulkChecker bulkChecker,
 			Time allocationTimeout) {
 		this.slotProvider = slotProvider;
 		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
-		this.slotSharingStrategy = slotSharingStrategy;
-		this.resourceProfileRetriever = resourceProfileRetriever;
-		this.priorAllocationIdRetriever = priorAllocationIdRetriever;
 		this.bulkChecker = bulkChecker;
 		this.allocationTimeout = allocationTimeout;
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(PreferredLocationsRetriever preferredLocationsRetriever) {
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
+		SlotSharingStrategy slotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(
+			context.getSchedulingTopology(),
+			context.getLogicalSlotSharingGroups(),
+			context.getCoLocationGroups());
 		SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new MergingSharedSlotProfileRetrieverFactory(
-			preferredLocationsRetriever,
-			priorAllocationIdRetriever);
+			context::getPreferredLocations,
+			context::getPriorAllocationId);
 		return new SlotSharingExecutionSlotAllocator(
 			slotProvider,
 			slotWillBeOccupiedIndefinitely,
@@ -75,6 +62,6 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 			sharedSlotProfileRetrieverFactory,
 			bulkChecker,
 			allocationTimeout,
-			resourceProfileRetriever);
+			context::getResourceProfile);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index e1c8d5a..aa3e9da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
@@ -35,9 +37,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -361,4 +366,44 @@ public class JobGraphTest extends TestLogger {
 			checkpointCoordinatorConfiguration,
 			null);
 	}
+
+	@Test
+	public void testGetSlotSharingGroups() {
+		final JobVertex v1 = new JobVertex("1");
+		final JobVertex v2 = new JobVertex("2");
+		final JobVertex v3 = new JobVertex("3");
+		final JobVertex v4 = new JobVertex("4");
+
+		final SlotSharingGroup group1 = new SlotSharingGroup();
+		v1.setSlotSharingGroup(group1);
+		v2.setSlotSharingGroup(group1);
+
+		final SlotSharingGroup group2 = new SlotSharingGroup();
+		v3.setSlotSharingGroup(group2);
+		v4.setSlotSharingGroup(group2);
+
+		final JobGraph jobGraph = new JobGraph(v1, v2, v3, v4);
+
+		assertThat(jobGraph.getSlotSharingGroups(), containsInAnyOrder(group1, group2));
+	}
+
+	@Test
+	public void testGetCoLocationGroupDescriptors() {
+		final JobVertex v1 = new JobVertex("1");
+		final JobVertex v2 = new JobVertex("2");
+		final JobVertex v3 = new JobVertex("3");
+		final JobVertex v4 = new JobVertex("4");
+
+		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+		v1.setSlotSharingGroup(slotSharingGroup);
+		v2.setSlotSharingGroup(slotSharingGroup);
+		v1.setStrictlyCoLocatedWith(v2);
+
+		final JobGraph jobGraph = new JobGraph(v1, v2, v3, v4);
+
+		assertThat(jobGraph.getCoLocationGroupDescriptors(), hasSize(1));
+
+		final CoLocationGroupDesc onlyCoLocationGroupDesc = jobGraph.getCoLocationGroupDescriptors().iterator().next();
+		assertThat(onlyCoLocationGroupDesc.getVertices(), containsInAnyOrder(v1.getID(), v2.getID()));
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
index a996686..39865c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
@@ -37,7 +37,7 @@ public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever ignored) {
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
 		return testExecutionSlotAllocator;
 	}