You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:20 UTC
[03/11] flink git commit: [FLINK-7956] [flip6] Add support for queued
scheduling with slot sharing to SlotPool
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
new file mode 100644
index 0000000..d5460eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SCHEDULER;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SLOT_POOL;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+
+/**
+ * Test base for scheduler related test cases. The test are
+ * executed with the {@link Scheduler} and the {@link SlotPool}.
+ */
+public class SchedulerTestBase extends TestLogger {
+
+ protected TestingSlotProvider testingSlotProvider;
+
+ private SchedulerType schedulerType;
+
+ private RpcService rpcService;
+
+ enum SchedulerType {
+ SCHEDULER,
+ SLOT_POOL
+ }
+
+ @Parameterized.Parameters(name = "Scheduler type = {0}")
+ public static Collection<Object[]> schedulerTypes() {
+ return Arrays.asList(
+ new Object[]{SCHEDULER},
+ new Object[]{SLOT_POOL});
+ }
+
+ protected SchedulerTestBase(SchedulerType schedulerType) {
+ this.schedulerType = Preconditions.checkNotNull(schedulerType);
+ rpcService = null;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ switch (schedulerType) {
+ case SCHEDULER:
+ testingSlotProvider = new TestingSchedulerSlotProvider(
+ new Scheduler(
+ TestingUtils.defaultExecutionContext()));
+ break;
+ case SLOT_POOL:
+ rpcService = new TestingRpcService();
+ final JobID jobId = new JobID();
+ final TestingSlotPool slotPool = new TestingSlotPool(rpcService, jobId);
+ testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);
+
+ final JobMasterId jobMasterId = JobMasterId.generate();
+ final String jobManagerAddress = "localhost";
+ slotPool.start(jobMasterId, jobManagerAddress);
+ break;
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (testingSlotProvider != null) {
+ testingSlotProvider.shutdown();
+ testingSlotProvider = null;
+ }
+
+ if (rpcService != null) {
+ rpcService.stopService();
+ rpcService = null;
+ }
+ }
+
+ protected interface TestingSlotProvider extends SlotProvider {
+ TaskManagerLocation addTaskManager(int numberSlots);
+
+ void releaseTaskManager(ResourceID resourceId);
+
+ int getNumberOfAvailableSlots();
+
+ int getNumberOfLocalizedAssignments();
+
+ int getNumberOfNonLocalizedAssignments();
+
+ int getNumberOfUnconstrainedAssignments();
+
+ int getNumberOfHostLocalizedAssignments();
+
+ int getNumberOfSlots(SlotSharingGroup slotSharingGroup);
+
+ int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId);
+
+ void shutdown() throws Exception;
+ }
+
+ private static final class TestingSchedulerSlotProvider implements TestingSlotProvider {
+ private final Scheduler scheduler;
+
+ private TestingSchedulerSlotProvider(Scheduler scheduler) {
+ this.scheduler = Preconditions.checkNotNull(scheduler);
+ }
+
+ @Override
+ public CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations) {
+ return scheduler.allocateSlot(task, allowQueued, preferredLocations);
+ }
+
+ @Override
+ public TaskManagerLocation addTaskManager(int numberSlots) {
+ final Instance instance = getRandomInstance(numberSlots);
+ scheduler.newInstanceAvailable(instance);
+
+ return instance.getTaskManagerLocation();
+ }
+
+ @Override
+ public void releaseTaskManager(ResourceID resourceId) {
+ final Instance instance = scheduler.getInstance(resourceId);
+
+ if (instance != null) {
+ scheduler.instanceDied(instance);
+ }
+ }
+
+ @Override
+ public int getNumberOfAvailableSlots() {
+ return scheduler.getNumberOfAvailableSlots();
+ }
+
+ @Override
+ public int getNumberOfLocalizedAssignments() {
+ return scheduler.getNumberOfLocalizedAssignments();
+ }
+
+ @Override
+ public int getNumberOfNonLocalizedAssignments() {
+ return scheduler.getNumberOfNonLocalizedAssignments();
+ }
+
+ @Override
+ public int getNumberOfUnconstrainedAssignments() {
+ return scheduler.getNumberOfUnconstrainedAssignments();
+ }
+
+ @Override
+ public int getNumberOfHostLocalizedAssignments() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
+ return slotSharingGroup.getTaskAssignment().getNumberOfSlots();
+ }
+
+ @Override
+ public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
+ return slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexId);
+ }
+
+ @Override
+ public void shutdown() {
+ scheduler.shutdown();
+ }
+ }
+
+ private static final class TestingSlotPoolSlotProvider implements TestingSlotProvider {
+
+ private final TestingSlotPool slotPool;
+
+ private final SlotProvider slotProvider;
+
+ private final AtomicInteger numberOfLocalizedAssignments;
+
+ private final AtomicInteger numberOfNonLocalizedAssignments;
+
+ private final AtomicInteger numberOfUnconstrainedAssignments;
+
+ private final AtomicInteger numberOfHostLocalizedAssignments;
+
+ private TestingSlotPoolSlotProvider(TestingSlotPool slotPool) {
+ this.slotPool = Preconditions.checkNotNull(slotPool);
+ this.slotProvider = slotPool.getSlotProvider();
+
+ this.numberOfLocalizedAssignments = new AtomicInteger();
+ this.numberOfNonLocalizedAssignments = new AtomicInteger();
+ this.numberOfUnconstrainedAssignments = new AtomicInteger();
+ this.numberOfHostLocalizedAssignments = new AtomicInteger();
+ }
+
+ @Override
+ public TaskManagerLocation addTaskManager(int numberSlots) {
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final ResourceID resourceId = taskManagerLocation.getResourceID();
+ final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+ try {
+ slotPoolGateway.registerTaskManager(resourceId).get();
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
+ }
+
+ final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+ final Collection<SlotOffer> slotOffers = new ArrayList<>(numberSlots);
+
+ for (int i = 0; i < numberSlots; i++) {
+ final SlotOffer slotOffer = new SlotOffer(
+ new AllocationID(),
+ i,
+ ResourceProfile.UNKNOWN);
+
+ slotOffers.add(slotOffer);
+ }
+
+ final Collection<SlotOffer> acceptedSlotOffers;
+
+ try {
+ acceptedSlotOffers = slotPoolGateway.offerSlots(
+ taskManagerLocation,
+ taskManagerGateway,
+ slotOffers).get();
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
+ }
+
+ Preconditions.checkState(acceptedSlotOffers.size() == numberSlots);
+
+ return taskManagerLocation;
+ }
+
+ @Override
+ public void releaseTaskManager(ResourceID resourceId) {
+ try {
+ slotPool.releaseTaskManager(resourceId).get();
+ } catch (Exception e) {
+ throw new RuntimeException("Should not have happened.", e);
+ }
+ }
+
+ @Override
+ public int getNumberOfAvailableSlots() {
+ try {
+ return slotPool.getNumberOfAvailableSlots().get();
+ } catch (Exception e) {
+ throw new RuntimeException("Should not have happened.", e);
+ }
+ }
+
+ @Override
+ public int getNumberOfLocalizedAssignments() {
+ return numberOfLocalizedAssignments.get();
+ }
+
+ @Override
+ public int getNumberOfNonLocalizedAssignments() {
+ return numberOfNonLocalizedAssignments.get();
+ }
+
+ @Override
+ public int getNumberOfUnconstrainedAssignments() {
+ return numberOfUnconstrainedAssignments.get();
+ }
+
+ @Override
+ public int getNumberOfHostLocalizedAssignments() {
+ return numberOfHostLocalizedAssignments.get();
+ }
+
+ @Override
+ public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
+ try {
+ return slotPool.getNumberOfSharedSlots(slotSharingGroup.getSlotSharingGroupId()).get();
+ } catch (Exception e) {
+ throw new RuntimeException("Should not have happened.", e);
+ }
+ }
+
+ @Override
+ public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
+ try {
+ return slotPool.getNumberOfAvailableSlotsForGroup(slotSharingGroup.getSlotSharingGroupId(), jobVertexId).get();
+ } catch (Exception e) {
+ throw new RuntimeException("Should not have happened.", e);
+ }
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ RpcUtils.terminateRpcEndpoint(slotPool, TestingUtils.TIMEOUT());
+ }
+
+ @Override
+ public CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit task, boolean allowQueued, Collection<TaskManagerLocation> preferredLocations) {
+ return slotProvider.allocateSlot(task, allowQueued, preferredLocations).thenApply(
+ (LogicalSlot logicalSlot) -> {
+ switch (logicalSlot.getLocality()) {
+ case LOCAL:
+ numberOfLocalizedAssignments.incrementAndGet();
+ break;
+ case UNCONSTRAINED:
+ numberOfUnconstrainedAssignments.incrementAndGet();
+ break;
+ case NON_LOCAL:
+ numberOfNonLocalizedAssignments.incrementAndGet();
+ break;
+ case HOST_LOCAL:
+ numberOfHostLocalizedAssignments.incrementAndGet();
+ break;
+ default:
+ // ignore
+ }
+
+ return logicalSlot;
+ });
+ }
+ }
+
+ private static final class TestingSlotPool extends SlotPool {
+
+ public TestingSlotPool(RpcService rpcService, JobID jobId) {
+ super(rpcService, jobId);
+ }
+
+ CompletableFuture<Integer> getNumberOfAvailableSlots() {
+ return callAsync(
+ () -> getAvailableSlots().size(),
+ TestingUtils.infiniteTime());
+ }
+
+ CompletableFuture<Integer> getNumberOfSharedSlots(SlotSharingGroupId slotSharingGroupId) {
+ return callAsync(
+ () -> {
+ final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+
+ if (multiTaskSlotManager != null) {
+ return multiTaskSlotManager.getResolvedRootSlots().size();
+ } else {
+ throw new FlinkException("No MultiTaskSlotManager registered under " + slotSharingGroupId + '.');
+ }
+ },
+ TestingUtils.infiniteTime());
+ }
+
+ CompletableFuture<Integer> getNumberOfAvailableSlotsForGroup(SlotSharingGroupId slotSharingGroupId, JobVertexID jobVertexId) {
+ return callAsync(
+ () -> {
+ final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+
+ if (multiTaskSlotManager != null) {
+ int availableSlots = 0;
+
+ for (SlotSharingManager.MultiTaskSlot multiTaskSlot : multiTaskSlotManager.getResolvedRootSlots()) {
+ if (!multiTaskSlot.contains(jobVertexId)) {
+ availableSlots++;
+ }
+ }
+
+ return availableSlots;
+ } else {
+ throw new FlinkException("No MultiTaskSlotmanager registered under " + slotSharingGroupId + '.');
+ }
+ },
+ TestingUtils.infiniteTime());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 98dca03..4186255 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -18,9 +18,18 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -33,17 +42,9 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class SchedulerTestUtils {
@@ -83,9 +84,13 @@ public class SchedulerTestUtils {
public static Execution getDummyTask() {
+ ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
+
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.toString()).thenReturn("TEST-VERTEX");
+ when(vertex.getJobVertex()).thenReturn(executionJobVertex);
+ when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex);
@@ -117,11 +122,14 @@ public class SchedulerTestUtils {
}
public static Execution getTestVertex(Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) {
+ ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.toString()).thenReturn("TEST-VERTEX");
+ when(vertex.getJobVertex()).thenReturn(executionJobVertex);
+ when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex);
@@ -130,9 +138,11 @@ public class SchedulerTestUtils {
return execution;
}
- public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
+ public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks, SlotSharingGroup slotSharingGroup) {
+ ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
ExecutionVertex vertex = mock(ExecutionVertex.class);
-
+
+ when(executionJobVertex.getSlotSharingGroup()).thenReturn(slotSharingGroup);
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Collections.emptyList());
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.getJobvertexId()).thenReturn(jid);
@@ -141,6 +151,7 @@ public class SchedulerTestUtils {
when(vertex.getMaxParallelism()).thenReturn(numTasks);
when(vertex.toString()).thenReturn("TEST-VERTEX");
when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("TEST-VERTEX");
+ when(vertex.getJobVertex()).thenReturn(executionJobVertex);
Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex);
@@ -149,17 +160,26 @@ public class SchedulerTestUtils {
}
public static Execution getTestVertexWithLocation(
- JobVertexID jid, int taskIndex, int numTasks, TaskManagerLocation... locations) {
+ JobVertexID jid,
+ int taskIndex,
+ int numTasks,
+ SlotSharingGroup slotSharingGroup,
+ TaskManagerLocation... locations) {
+
+ ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
+
+ when(executionJobVertex.getSlotSharingGroup()).thenReturn(slotSharingGroup);
ExecutionVertex vertex = mock(ExecutionVertex.class);
- Collection<CompletableFuture<TaskManagerLocation>> preferrecLocationFutures = new ArrayList<>(locations.length);
+ Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(locations.length);
for (TaskManagerLocation location : locations) {
- preferrecLocationFutures.add(CompletableFuture.completedFuture(location));
+ preferredLocationFutures.add(CompletableFuture.completedFuture(location));
}
- when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferrecLocationFutures);
+ when(vertex.getJobVertex()).thenReturn(executionJobVertex);
+ when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
index 6d17ad0..add1ec2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -18,7 +18,8 @@
package org.apache.flink.runtime.jobmanager.slots;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
import java.util.concurrent.CompletableFuture;
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
index e7f9485..727c0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -18,7 +18,8 @@
package org.apache.flink.runtime.jobmanager.slots;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
new file mode 100644
index 0000000..e20700e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Simple logical slot for testing purposes.
+ */
+public class TestingLogicalSlot implements LogicalSlot {
+
+ private final TaskManagerLocation taskManagerLocation;
+
+ private final TaskManagerGateway taskManagerGateway;
+
+ private final AtomicReference<Payload> payloadReference;
+
+ private final int slotNumber;
+
+ private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
+ private final AllocationID allocationId;
+
+ private final SlotRequestId slotRequestId;
+
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ public TestingLogicalSlot() {
+ this(
+ new LocalTaskManagerLocation(),
+ new SimpleAckingTaskManagerGateway(),
+ 0,
+ new AllocationID(),
+ new SlotRequestId(),
+ new SlotSharingGroupId());
+ }
+
+ public TestingLogicalSlot(
+ TaskManagerLocation taskManagerLocation,
+ TaskManagerGateway taskManagerGateway,
+ int slotNumber,
+ AllocationID allocationId,
+ SlotRequestId slotRequestId,
+ SlotSharingGroupId slotSharingGroupId) {
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+ this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+ this.payloadReference = new AtomicReference<>();
+ this.slotNumber = slotNumber;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+ this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
+ }
+
+ @Override
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+
+ @Override
+ public TaskManagerGateway getTaskManagerGateway() {
+ return taskManagerGateway;
+ }
+
+ @Override
+ public Locality getLocality() {
+ return Locality.UNKNOWN;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return !releaseFuture.isDone();
+ }
+
+ @Override
+ public boolean tryAssignPayload(Payload payload) {
+ return payloadReference.compareAndSet(null, payload);
+ }
+
+ @Nullable
+ @Override
+ public Payload getPayload() {
+ return payloadReference.get();
+ }
+
+ @Override
+ public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
+ releaseFuture.complete(null);
+
+ return releaseFuture;
+ }
+
+ @Override
+ public int getPhysicalSlotNumber() {
+ return slotNumber;
+ }
+
+ @Override
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ @Override
+ public SlotRequestId getSlotRequestId() {
+ return slotRequestId;
+ }
+
+ @Nullable
+ @Override
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java
new file mode 100644
index 0000000..a59f765
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingPayload.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple payload implementation for testing purposes.
+ */
+public class TestingPayload implements LogicalSlot.Payload {
+
+ private final CompletableFuture<?> terminationFuture;
+
+ public TestingPayload() {
+ this.terminationFuture = new CompletableFuture<>();
+ }
+
+
+ @Override
+ public void fail(Throwable cause) {
+ terminationFuture.complete(null);
+ }
+
+ @Override
+ public CompletableFuture<?> getTerminalStateFuture() {
+ return terminationFuture;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
new file mode 100644
index 0000000..4dee924
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class AllocatedSlotsTest extends TestLogger {
+
+ @Test
+ public void testOperations() throws Exception {
+ SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
+
+ final AllocationID allocation1 = new AllocationID();
+ final SlotRequestId slotRequestID = new SlotRequestId();
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final ResourceID resource1 = taskManagerLocation.getResourceID();
+ final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
+
+ allocatedSlots.add(slotRequestID, slot1);
+
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertTrue(allocatedSlots.containResource(resource1));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
+ assertEquals(1, allocatedSlots.size());
+
+ final AllocationID allocation2 = new AllocationID();
+ final SlotRequestId slotRequestID2 = new SlotRequestId();
+ final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
+
+ allocatedSlots.add(slotRequestID2, slot2);
+
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.containResource(resource1));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertEquals(slot2, allocatedSlots.get(allocation2));
+ assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
+ assertEquals(2, allocatedSlots.size());
+
+ final AllocationID allocation3 = new AllocationID();
+ final SlotRequestId slotRequestID3 = new SlotRequestId();
+ final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+ final ResourceID resource2 = taskManagerLocation2.getResourceID();
+ final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
+
+ allocatedSlots.add(slotRequestID3, slot3);
+
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
+ assertTrue(allocatedSlots.containResource(resource1));
+ assertTrue(allocatedSlots.containResource(resource2));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertEquals(slot2, allocatedSlots.get(allocation2));
+ assertEquals(slot3, allocatedSlots.get(allocation3));
+ assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
+ assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+ assertEquals(3, allocatedSlots.size());
+
+ allocatedSlots.remove(slot2.getAllocationId());
+
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
+ assertTrue(allocatedSlots.containResource(resource1));
+ assertTrue(allocatedSlots.containResource(resource2));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertNull(allocatedSlots.get(allocation2));
+ assertEquals(slot3, allocatedSlots.get(allocation3));
+ assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
+ assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+ assertEquals(2, allocatedSlots.size());
+
+ allocatedSlots.remove(slot1.getAllocationId());
+
+ assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
+ assertFalse(allocatedSlots.containResource(resource1));
+ assertTrue(allocatedSlots.containResource(resource2));
+
+ assertNull(allocatedSlots.get(allocation1));
+ assertNull(allocatedSlots.get(allocation2));
+ assertEquals(slot3, allocatedSlots.get(allocation3));
+ assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
+ assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+ assertEquals(1, allocatedSlots.size());
+
+ allocatedSlots.remove(slot3.getAllocationId());
+
+ assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
+ assertFalse(allocatedSlots.containResource(resource1));
+ assertFalse(allocatedSlots.containResource(resource2));
+
+ assertNull(allocatedSlots.get(allocation1));
+ assertNull(allocatedSlots.get(allocation2));
+ assertNull(allocatedSlots.get(allocation3));
+ assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
+ assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource2).size());
+ assertEquals(0, allocatedSlots.size());
+ }
+
+ private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
+ return new AllocatedSlot(
+ allocationId,
+ taskManagerLocation,
+ 0,
+ ResourceProfile.UNKNOWN,
+ new SimpleAckingTaskManagerGateway());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
new file mode 100644
index 0000000..4835c57
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest extends TestLogger {
+
+ static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
+
+ static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
+
+ @Test
+ public void testAddAndRemove() throws Exception {
+ SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+ final ResourceID resource1 = new ResourceID("resource1");
+ final ResourceID resource2 = new ResourceID("resource2");
+
+ final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+ final AllocatedSlot slot2 = createAllocatedSlot(resource1);
+ final AllocatedSlot slot3 = createAllocatedSlot(resource2);
+
+ availableSlots.add(slot1, 1L);
+ availableSlots.add(slot2, 2L);
+ availableSlots.add(slot3, 3L);
+
+ assertEquals(3, availableSlots.size());
+ assertTrue(availableSlots.contains(slot1.getAllocationId()));
+ assertTrue(availableSlots.contains(slot2.getAllocationId()));
+ assertTrue(availableSlots.contains(slot3.getAllocationId()));
+ assertTrue(availableSlots.containsTaskManager(resource1));
+ assertTrue(availableSlots.containsTaskManager(resource2));
+
+ availableSlots.removeAllForTaskManager(resource1);
+
+ assertEquals(1, availableSlots.size());
+ assertFalse(availableSlots.contains(slot1.getAllocationId()));
+ assertFalse(availableSlots.contains(slot2.getAllocationId()));
+ assertTrue(availableSlots.contains(slot3.getAllocationId()));
+ assertFalse(availableSlots.containsTaskManager(resource1));
+ assertTrue(availableSlots.containsTaskManager(resource2));
+
+ availableSlots.removeAllForTaskManager(resource2);
+
+ assertEquals(0, availableSlots.size());
+ assertFalse(availableSlots.contains(slot1.getAllocationId()));
+ assertFalse(availableSlots.contains(slot2.getAllocationId()));
+ assertFalse(availableSlots.contains(slot3.getAllocationId()));
+ assertFalse(availableSlots.containsTaskManager(resource1));
+ assertFalse(availableSlots.containsTaskManager(resource2));
+ }
+
+ @Test
+ public void testPollFreeSlot() {
+ SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+ final ResourceID resource1 = new ResourceID("resource1");
+ final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+
+ availableSlots.add(slot1, 1L);
+
+ assertEquals(1, availableSlots.size());
+ assertTrue(availableSlots.contains(slot1.getAllocationId()));
+ assertTrue(availableSlots.containsTaskManager(resource1));
+
+ assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
+
+ SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
+ assertEquals(slot1, slotAndLocality.getSlot());
+ assertEquals(0, availableSlots.size());
+ assertFalse(availableSlots.contains(slot1.getAllocationId()));
+ assertFalse(availableSlots.containsTaskManager(resource1));
+ }
+
+ static AllocatedSlot createAllocatedSlot(final ResourceID resourceId) {
+ TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
+ when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
+
+ TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
+
+ return new AllocatedSlot(
+ new AllocationID(),
+ mockTaskManagerLocation,
+ 0,
+ DEFAULT_TESTING_PROFILE,
+ mockTaskManagerGateway);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
new file mode 100644
index 0000000..7454e3e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link CoLocationConstraint} with the {@link SlotPool}.
+ */
+public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase {
+
+ /**
+ * Tests the scheduling of two tasks with a parallelism of 2 and a co-location constraint.
+ */
+ @Test
+ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, InterruptedException {
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+ CoLocationGroup group = new CoLocationGroup();
+ CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+ CoLocationConstraint coLocationConstraint2 = group.getLocationConstraint(1);
+
+ final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+ JobVertexID jobVertexId1 = new JobVertexID();
+ JobVertexID jobVertexId2 = new JobVertexID();
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture11 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture22 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ coLocationConstraint2),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture12 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture21 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ coLocationConstraint2),
+ true,
+ Collections.emptyList());
+
+ final AllocationID allocationId1 = allocationIds.take();
+ final AllocationID allocationId2 = allocationIds.take();
+
+ CompletableFuture<Boolean> slotOfferFuture1 = slotPoolGateway.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId1,
+ 0,
+ ResourceProfile.UNKNOWN));
+
+ CompletableFuture<Boolean> slotOfferFuture2 = slotPoolGateway.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId2,
+ 0,
+ ResourceProfile.UNKNOWN));
+
+ assertTrue(slotOfferFuture1.get());
+ assertTrue(slotOfferFuture2.get());
+
+ LogicalSlot logicalSlot11 = logicalSlotFuture11.get();
+ LogicalSlot logicalSlot12 = logicalSlotFuture12.get();
+ LogicalSlot logicalSlot21 = logicalSlotFuture21.get();
+ LogicalSlot logicalSlot22 = logicalSlotFuture22.get();
+
+ assertEquals(logicalSlot11.getAllocationId(), logicalSlot12.getAllocationId());
+ assertEquals(logicalSlot21.getAllocationId(), logicalSlot22.getAllocationId());
+ assertNotEquals(logicalSlot11.getAllocationId(), logicalSlot21.getAllocationId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
new file mode 100644
index 0000000..2d862c5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.pattern.AskTimeoutException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest extends TestLogger {
+
+ private static RpcService rpcService;
+
+ private static final Time timeout = Time.seconds(10L);
+
+ // ------------------------------------------------------------------------
+ // setup
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() {
+ ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ if (rpcService != null) {
+ rpcService.stopService();
+ rpcService = null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testSlotAllocationNoResourceManager() throws Exception {
+ final JobID jid = new JobID();
+
+ final SlotPool pool = new SlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ Time.milliseconds(10L) // this is the timeout for the request tested here
+ );
+
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+
+ CompletableFuture<LogicalSlot> future = pool.allocateSlot(
+ new SlotRequestId(),
+ new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ true,
+ TestingUtils.infiniteTime());
+
+ try {
+ future.get();
+ fail("We expected an ExecutionException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof NoResourceAvailableException);
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
+ }
+
+ @Test
+ public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
+ final JobID jid = new JobID();
+
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
+
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+ SlotRequestId requestId = new SlotRequestId();
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
+ requestId,
+ new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ true,
+ Time.milliseconds(10L));
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+
+ slotPoolGateway.releaseSlot(requestId, null, null).get();
+
+ assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
+ }
+
+ @Test
+ public void testCancelSlotAllocationWithResourceManager() throws Exception {
+ final JobID jid = new JobID();
+
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
+
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+ ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ pool.connectToResourceManager(resourceManagerGateway);
+
+ SlotRequestId requestId = new SlotRequestId();
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
+ requestId,
+ new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ true,
+ Time.milliseconds(10L));
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
+
+ slotPoolGateway.releaseSlot(requestId, null, null).get();
+ assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
+ }
+
+ /**
+ * Tests that allocated slots are not cancelled.
+ */
+ @Test
+ public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
+ final JobID jid = new JobID();
+
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
+
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+ SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
+
+ final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+
+ TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ resourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ pool.connectToResourceManager(resourceManagerGateway);
+
+ SlotRequestId requestId = new SlotRequestId();
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
+ requestId,
+ new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+ DEFAULT_TESTING_PROFILE,
+ Collections.emptyList(),
+ true,
+ Time.milliseconds(10L));
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ AllocationID allocationId = allocationIdFuture.get();
+ final SlotOffer slotOffer = new SlotOffer(
+ allocationId,
+ 0,
+ DEFAULT_TESTING_PROFILE);
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
+
+ assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+
+ assertTrue(pool.containsAllocatedSlot(allocationId).get());
+
+ pool.releaseSlot(requestId, null, null).get();
+
+ assertFalse(pool.containsAllocatedSlot(allocationId).get());
+ assertTrue(pool.containsAvailableSlot(allocationId).get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
+ }
+
+ /**
+ * This case make sure when allocateSlot in ProviderAndOwner timeout,
+ * it will automatically call cancelSlotAllocation as will inject future.whenComplete in ProviderAndOwner.
+ */
+ @Test
+ public void testProviderAndOwner() throws Exception {
+ final JobID jid = new JobID();
+
+ final TestingSlotPool pool = new TestingSlotPool(
+ rpcService,
+ jid,
+ SystemClock.getInstance(),
+ Time.milliseconds(10L),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
+
+ final CompletableFuture<SlotRequestId> releaseSlotFuture = new CompletableFuture<>();
+
+ pool.setReleaseSlotConsumer(
+ slotRequestID -> releaseSlotFuture.complete(slotRequestID));
+
+ try {
+ pool.start(JobMasterId.generate(), "foobar");
+ ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ pool.connectToResourceManager(resourceManagerGateway);
+
+ ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
+
+ // test the pending request is clear when timed out
+ CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
+ mockScheduledUnit,
+ true,
+ Collections.emptyList());
+
+ try {
+ future.get();
+ fail("We expected a AskTimeoutException.");
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+ }
+
+ // wait for the cancel call on the SlotPool
+ releaseSlotFuture.get();
+
+ assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(pool, timeout);
+ }
+ }
+
+ /**
+ * Testing SlotPool which exposes internal state via some testing methods.
+ */
+ private static final class TestingSlotPool extends SlotPool {
+
+ private volatile Consumer<SlotRequestId> releaseSlotConsumer;
+
+ public TestingSlotPool(
+ RpcService rpcService,
+ JobID jobId,
+ Clock clock,
+ Time slotRequestTimeout,
+ Time resourceManagerAllocationTimeout,
+ Time resourceManagerRequestTimeout) {
+ super(
+ rpcService,
+ jobId,
+ clock,
+ slotRequestTimeout,
+ resourceManagerAllocationTimeout,
+ resourceManagerRequestTimeout);
+
+ releaseSlotConsumer = null;
+ }
+
+ public void setReleaseSlotConsumer(Consumer<SlotRequestId> releaseSlotConsumer) {
+ this.releaseSlotConsumer = Preconditions.checkNotNull(releaseSlotConsumer);
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> releaseSlot(
+ SlotRequestId slotRequestId,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ @Nullable Throwable cause) {
+ final Consumer<SlotRequestId> currentReleaseSlotConsumer = releaseSlotConsumer;
+
+ if (currentReleaseSlotConsumer != null) {
+ currentReleaseSlotConsumer.accept(slotRequestId);
+ }
+
+ return super.releaseSlot(slotRequestId, slotSharingGroupId, cause);
+ }
+
+ CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
+ return callAsync(
+ () -> getAllocatedSlots().contains(allocationId),
+ timeout);
+ }
+
+ CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
+ return callAsync(
+ () -> getAvailableSlots().contains(allocationId),
+ timeout);
+ }
+
+ CompletableFuture<Integer> getNumberOfPendingRequests() {
+ return callAsync(
+ () -> getPendingRequests().size(),
+ timeout);
+ }
+
+ CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
+ return callAsync(
+ () -> getWaitingForResourceManager().size(),
+ timeout);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
new file mode 100644
index 0000000..31be1ae
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test base for {@link SlotPool} related scheduling test cases.
+ */
+public class SlotPoolSchedulingTestBase extends TestLogger {
+
+ private static final JobID jobId = new JobID();
+
+ private static final JobMasterId jobMasterId = new JobMasterId();
+
+ private static final String jobMasterAddress = "foobar";
+
+ private static TestingRpcService testingRpcService;
+
+ protected SlotPool slotPool;
+
+ protected SlotPoolGateway slotPoolGateway;
+
+ protected SlotProvider slotProvider;
+
+ protected TestingResourceManagerGateway testingResourceManagerGateway;
+
+ @BeforeClass
+ public static void setup() {
+ testingRpcService = new TestingRpcService();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (testingRpcService != null) {
+ testingRpcService.stopService();
+ testingRpcService = null;
+ }
+ }
+
+ @Before
+ public void setupBefore() throws Exception {
+ testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+ slotPool = new SlotPool(
+ testingRpcService,
+ jobId);
+
+ slotPool.start(jobMasterId, jobMasterAddress);
+
+ slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+ slotProvider = slotPool.getSlotProvider();
+
+ slotPool.connectToResourceManager(testingResourceManagerGateway);
+ }
+
+ @After
+ public void teardownAfter() throws InterruptedException, ExecutionException, TimeoutException {
+ if (slotPool != null) {
+ RpcUtils.terminateRpcEndpoint(slotPool, TestingUtils.TIMEOUT());
+ slotPool = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
new file mode 100644
index 0000000..bb6c2b7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test cases for slot sharing with the {@link SlotPool}.
+ */
+public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase {
+
+ @Test
+ public void testSingleQueuedSharedSlotScheduling() throws Exception {
+ final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+ SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+ CompletableFuture<LogicalSlot> logicalSlotFuture = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ new JobVertexID(),
+ slotSharingGroupId,
+ null),
+ true,
+ Collections.emptyList());
+
+ assertFalse(logicalSlotFuture.isDone());
+
+ final AllocationID allocationId = allocationIdFuture.get();
+
+ CompletableFuture<Boolean> booleanCompletableFuture = slotPoolGateway.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId,
+ 0,
+ ResourceProfile.UNKNOWN));
+
+ assertTrue(booleanCompletableFuture.get());
+
+ final LogicalSlot logicalSlot = logicalSlotFuture.get();
+
+ assertEquals(slotSharingGroupId, logicalSlot.getSlotSharingGroupId());
+ }
+
+ /**
+ * Tests that returned slot futures are failed if the allocation request is failed.
+ */
+ @Test
+ public void testFailingQueuedSharedSlotScheduling() throws ExecutionException, InterruptedException {
+ final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ new JobVertexID(),
+ new SlotSharingGroupId(),
+ null),
+ true,
+ Collections.emptyList());
+
+ final AllocationID allocationId = allocationIdFuture.get();
+
+ // this should fail the returned logical slot future
+ slotPoolGateway.failAllocation(allocationId, new FlinkException("Testing Exception"));
+
+ try {
+ logicalSlotFuture.get();
+ fail("The slot future should have failed.");
+ } catch (ExecutionException ee) {
+ assertTrue(ExceptionUtils.findThrowable(ee, FlinkException.class).isPresent());
+ }
+ }
+
+ /**
+ * Tests queued slot scheduling with a single slot sharing group
+ */
+ @Test
+ public void testQueuedSharedSlotScheduling() throws InterruptedException, ExecutionException {
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+ final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+ final JobVertexID jobVertexId1 = new JobVertexID();
+ final JobVertexID jobVertexId2 = new JobVertexID();
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ null),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ null),
+ true,
+ Collections.emptyList());
+
+ assertFalse(logicalSlotFuture1.isDone());
+ assertFalse(logicalSlotFuture2.isDone());
+
+ final AllocationID allocationId1 = allocationIds.take();
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ null),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ null),
+ true,
+ Collections.emptyList());
+
+ assertFalse(logicalSlotFuture3.isDone());
+ assertFalse(logicalSlotFuture4.isDone());
+
+ final AllocationID allocationId2 = allocationIds.take();
+
+ // this should fulfill the first two slot futures
+ CompletableFuture<Boolean> offerFuture = slotPoolGateway.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId1,
+ 0,
+ ResourceProfile.UNKNOWN));
+
+ assertTrue(offerFuture.get());
+
+ LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+ LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+ assertEquals(logicalSlot1.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
+ assertEquals(allocationId1, logicalSlot1.getAllocationId());
+ assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+ assertFalse(logicalSlotFuture3.isDone());
+ assertFalse(logicalSlotFuture4.isDone());
+
+ // release the shared slot by releasing the individual tasks
+ logicalSlot1.releaseSlot(null);
+ logicalSlot2.releaseSlot(null);
+
+ LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+ LogicalSlot logicalSlot4 = logicalSlotFuture4.get();
+
+ assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
+ assertEquals(allocationId1, logicalSlot3.getAllocationId());
+ assertEquals(allocationId1, logicalSlot4.getAllocationId());
+ }
+
+ /**
+ * Tests queued slot scheduling with multiple slot sharing groups.
+ */
+ @Test
+ public void testQueuedMultipleSlotSharingGroups() throws ExecutionException, InterruptedException {
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(4);
+
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final SlotSharingGroupId slotSharingGroupId1 = new SlotSharingGroupId();
+ final SlotSharingGroupId slotSharingGroupId2 = new SlotSharingGroupId();
+ final JobVertexID jobVertexId1 = new JobVertexID();
+ final JobVertexID jobVertexId2 = new JobVertexID();
+ final JobVertexID jobVertexId3 = new JobVertexID();
+ final JobVertexID jobVertexId4 = new JobVertexID();
+
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId1,
+ null),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId1,
+ null),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId3,
+ slotSharingGroupId2,
+ null),
+ true,
+ Collections.emptyList());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId4,
+ slotSharingGroupId2,
+ null),
+ true,
+ Collections.emptyList());
+
+ assertFalse(logicalSlotFuture1.isDone());
+ assertFalse(logicalSlotFuture2.isDone());
+ assertFalse(logicalSlotFuture3.isDone());
+ assertFalse(logicalSlotFuture4.isDone());
+
+ // we expect two slot requests
+ final AllocationID allocationId1 = allocationIds.take();
+ final AllocationID allocationId2 = allocationIds.take();
+
+ CompletableFuture<Boolean> offerFuture1 = slotPoolGateway.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId1,
+ 0,
+ ResourceProfile.UNKNOWN));
+
+ CompletableFuture<Boolean> offerFuture2 = slotPoolGateway.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId2,
+ 0,
+ ResourceProfile.UNKNOWN));
+
+ assertTrue(offerFuture1.get());
+ assertTrue(offerFuture2.get());
+
+ LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+ LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+ LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+ LogicalSlot logicalSlot4 = logicalSlotFuture4.get();
+
+ assertEquals(logicalSlot1.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
+ assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
+
+ assertEquals(allocationId1, logicalSlot1.getAllocationId());
+ assertEquals(allocationId2, logicalSlot3.getAllocationId());
+ }
+
+}