You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/11 14:25:26 UTC
[flink] 02/03: [FLINK-17016][runtime] Use
SlotSharingExecutionSlotAllocator for pipelined region scheduling
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d918cc3e660d9cdbb84a175c8bb93ce6a14e38dc
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Aug 31 17:26:24 2020 +0800
[FLINK-17016][runtime] Use SlotSharingExecutionSlotAllocator for pipelined region scheduling
---
.../apache/flink/runtime/jobgraph/JobGraph.java | 26 ++++++
.../slotpool/PhysicalSlotProviderImpl.java | 7 +-
.../DefaultExecutionSlotAllocatorFactory.java | 4 +-
.../flink/runtime/scheduler/DefaultScheduler.java | 10 ++-
.../runtime/scheduler/DefaultSchedulerFactory.java | 11 +--
.../scheduler/ExecutionSlotAllocationContext.java | 95 ++++++++++++++++++++++
.../scheduler/ExecutionSlotAllocatorFactory.java | 10 ++-
.../OneSlotPerExecutionSlotAllocatorFactory.java | 4 +-
.../SlotSharingExecutionSlotAllocatorFactory.java | 29 ++-----
.../flink/runtime/jobgraph/JobGraphTest.java | 45 ++++++++++
.../TestExecutionSlotAllocatorFactory.java | 2 +-
11 files changed, 207 insertions(+), 36 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 95a9a28..ffa1502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -26,7 +26,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
@@ -41,7 +44,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -300,6 +305,27 @@ public class JobGraph implements Serializable {
return this.taskVertices.size();
}
+ public Set<SlotSharingGroup> getSlotSharingGroups() {
+ final Set<SlotSharingGroup> slotSharingGroups = IterableUtils
+ .toStream(getVertices())
+ .map(JobVertex::getSlotSharingGroup)
+ .collect(Collectors.toSet());
+ return Collections.unmodifiableSet(slotSharingGroups);
+ }
+
+ public Set<CoLocationGroupDesc> getCoLocationGroupDescriptors() {
+ // invoke distinct() on CoLocationGroup first to avoid creating
+ // multiple CoLocationGroupDec from one CoLocationGroup
+ final Set<CoLocationGroupDesc> coLocationGroups = IterableUtils
+ .toStream(getVertices())
+ .map(JobVertex::getCoLocationGroup)
+ .filter(Objects::nonNull)
+ .distinct()
+ .map(CoLocationGroupDesc::from)
+ .collect(Collectors.toSet());
+ return Collections.unmodifiableSet(coLocationGroups);
+ }
+
/**
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 023405b..4841a79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -32,14 +32,17 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
-class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
+/**
+ * The provider serves physical slot requests.
+ */
+public class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
private static final Logger LOG = LoggerFactory.getLogger(PhysicalSlotProviderImpl.class);
private final SlotSelectionStrategy slotSelectionStrategy;
private final SlotPool slotPool;
- PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
+ public PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
this.slotPool = checkNotNull(slotPool);
slotPool.disableBatchSlotRequestTimeoutCheck();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
index 4324c3b..824e4cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
@@ -35,7 +35,7 @@ class DefaultExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFact
}
@Override
- public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
- return new DefaultExecutionSlotAllocator(slotProvider, preferredLocationsRetriever);
+ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
+ return new DefaultExecutionSlotAllocator(slotProvider, context::getPreferredLocations);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index ac660ce..db748bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -160,7 +160,15 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
failoverStrategy,
restartBackoffTimeStrategy);
this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
- this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(getPreferredLocationsRetriever());
+
+ final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
+ getPreferredLocationsRetriever(),
+ executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
+ executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
+ getSchedulingTopology(),
+ () -> getJobGraph().getSlotSharingGroups(),
+ () -> getJobGraph().getCoLocationGroupDescriptors());
+ this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
this.verticesWaitingForRestart = new HashSet<>();
this.startUpAction = startUpAction;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 581e075..84f3143 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -35,9 +35,9 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
@@ -164,10 +164,11 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
.createFromSlotPool(slotPool, SystemClock.getInstance());
- final BulkSlotProvider bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool, bulkChecker);
- final ExecutionSlotAllocatorFactory allocatorFactory = new OneSlotPerExecutionSlotAllocatorFactory(
- bulkSlotProvider,
+ final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+ final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
+ physicalSlotProvider,
scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ bulkChecker,
slotRequestTimeout);
return new DefaultSchedulerComponents(
new PipelinedRegionSchedulingStrategy.Factory(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
new file mode 100644
index 0000000..8180c1f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Context for slot allocation.
+ */
+class ExecutionSlotAllocationContext implements PreferredLocationsRetriever {
+
+ private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+ private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+ private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
+
+ private final SchedulingTopology schedulingTopology;
+
+ private final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier;
+
+ private final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier;
+
+ ExecutionSlotAllocationContext(
+ final PreferredLocationsRetriever preferredLocationsRetriever,
+ final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+ final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
+ final SchedulingTopology schedulingTopology,
+ final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier,
+ final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier) {
+
+ this.preferredLocationsRetriever = preferredLocationsRetriever;
+ this.resourceProfileRetriever = resourceProfileRetriever;
+ this.priorAllocationIdRetriever = priorAllocationIdRetriever;
+ this.schedulingTopology = schedulingTopology;
+ this.logicalSlotSharingGroupSupplier = logicalSlotSharingGroupSupplier;
+ this.coLocationGroupSupplier = coLocationGroupSupplier;
+ }
+
+ @Override
+ public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
+ final ExecutionVertexID executionVertexId,
+ final Set<ExecutionVertexID> producersToIgnore) {
+ return preferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
+ }
+
+ ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
+ return resourceProfileRetriever.apply(executionVertexId);
+ }
+
+ AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
+ return priorAllocationIdRetriever.apply(executionVertexId);
+ }
+
+ SchedulingTopology getSchedulingTopology() {
+ return schedulingTopology;
+ }
+
+ Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
+ return logicalSlotSharingGroupSupplier.get();
+ }
+
+ Set<CoLocationGroupDesc> getCoLocationGroups() {
+ return coLocationGroupSupplier.get();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
index 1dc82aa..7a1586b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
@@ -22,8 +22,14 @@ package org.apache.flink.runtime.scheduler;
/**
* Interface for {@link ExecutionSlotAllocator} factories.
*/
+@FunctionalInterface
public interface ExecutionSlotAllocatorFactory {
- ExecutionSlotAllocator createInstance(PreferredLocationsRetriever preferredLocationsRetriever);
-
+ /**
+ * Instantiates the {@link ExecutionSlotAllocator}.
+ *
+ * @param context for slot allocation
+ * @return The instantiated slot allocator
+ */
+ ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
index 1824daf..b8a7f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -45,10 +45,10 @@ class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorF
}
@Override
- public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
+ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
return new OneSlotPerExecutionSlotAllocator(
slotProvider,
- preferredLocationsRetriever,
+ context::getPreferredLocations,
slotWillBeOccupiedIndefinitely,
allocationTimeout);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
index 8842c7c..392ee67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
@@ -19,14 +19,9 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import java.util.function.Function;
/**
* Factory for {@link SlotSharingExecutionSlotAllocator}.
@@ -36,12 +31,6 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
private final boolean slotWillBeOccupiedIndefinitely;
- private final SlotSharingStrategy slotSharingStrategy;
-
- private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
- private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
-
private final PhysicalSlotRequestBulkChecker bulkChecker;
private final Time allocationTimeout;
@@ -49,25 +38,23 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
SlotSharingExecutionSlotAllocatorFactory(
PhysicalSlotProvider slotProvider,
boolean slotWillBeOccupiedIndefinitely,
- SlotSharingStrategy slotSharingStrategy,
- Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
- Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
PhysicalSlotRequestBulkChecker bulkChecker,
Time allocationTimeout) {
this.slotProvider = slotProvider;
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
- this.slotSharingStrategy = slotSharingStrategy;
- this.resourceProfileRetriever = resourceProfileRetriever;
- this.priorAllocationIdRetriever = priorAllocationIdRetriever;
this.bulkChecker = bulkChecker;
this.allocationTimeout = allocationTimeout;
}
@Override
- public ExecutionSlotAllocator createInstance(PreferredLocationsRetriever preferredLocationsRetriever) {
+ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
+ SlotSharingStrategy slotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(
+ context.getSchedulingTopology(),
+ context.getLogicalSlotSharingGroups(),
+ context.getCoLocationGroups());
SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new MergingSharedSlotProfileRetrieverFactory(
- preferredLocationsRetriever,
- priorAllocationIdRetriever);
+ context::getPreferredLocations,
+ context::getPriorAllocationId);
return new SlotSharingExecutionSlotAllocator(
slotProvider,
slotWillBeOccupiedIndefinitely,
@@ -75,6 +62,6 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
sharedSlotProfileRetrieverFactory,
bulkChecker,
allocationTimeout,
- resourceProfileRetriever);
+ context::getResourceProfile);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index e1c8d5a..aa3e9da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
@@ -35,9 +37,12 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -361,4 +366,44 @@ public class JobGraphTest extends TestLogger {
checkpointCoordinatorConfiguration,
null);
}
+
+ @Test
+ public void testGetSlotSharingGroups() {
+ final JobVertex v1 = new JobVertex("1");
+ final JobVertex v2 = new JobVertex("2");
+ final JobVertex v3 = new JobVertex("3");
+ final JobVertex v4 = new JobVertex("4");
+
+ final SlotSharingGroup group1 = new SlotSharingGroup();
+ v1.setSlotSharingGroup(group1);
+ v2.setSlotSharingGroup(group1);
+
+ final SlotSharingGroup group2 = new SlotSharingGroup();
+ v3.setSlotSharingGroup(group2);
+ v4.setSlotSharingGroup(group2);
+
+ final JobGraph jobGraph = new JobGraph(v1, v2, v3, v4);
+
+ assertThat(jobGraph.getSlotSharingGroups(), containsInAnyOrder(group1, group2));
+ }
+
+ @Test
+ public void testGetCoLocationGroupDescriptors() {
+ final JobVertex v1 = new JobVertex("1");
+ final JobVertex v2 = new JobVertex("2");
+ final JobVertex v3 = new JobVertex("3");
+ final JobVertex v4 = new JobVertex("4");
+
+ final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+ v1.setSlotSharingGroup(slotSharingGroup);
+ v2.setSlotSharingGroup(slotSharingGroup);
+ v1.setStrictlyCoLocatedWith(v2);
+
+ final JobGraph jobGraph = new JobGraph(v1, v2, v3, v4);
+
+ assertThat(jobGraph.getCoLocationGroupDescriptors(), hasSize(1));
+
+ final CoLocationGroupDesc onlyCoLocationGroupDesc = jobGraph.getCoLocationGroupDescriptors().iterator().next();
+ assertThat(onlyCoLocationGroupDesc.getVertices(), containsInAnyOrder(v1.getID(), v2.getID()));
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
index a996686..39865c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
@@ -37,7 +37,7 @@ public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
}
@Override
- public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever ignored) {
+ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
return testExecutionSlotAllocator;
}