You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:23 UTC

[06/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
deleted file mode 100644
index 60e1d34..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.clock.Clock;
-import org.apache.flink.runtime.util.clock.SystemClock;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.pattern.AskTimeoutException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
-
-import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the SlotPool using a proper RPC setup.
- */
-public class SlotPoolRpcTest extends TestLogger {
-
-	private static RpcService rpcService;
-
-	private static final Time timeout = Time.seconds(10L);
-
-	// ------------------------------------------------------------------------
-	//  setup
-	// ------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void setup() {
-		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-		rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
-	}
-
-	@AfterClass
-	public static  void shutdown() {
-		if (rpcService != null) {
-			rpcService.stopService();
-			rpcService = null;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  tests
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testSlotAllocationNoResourceManager() throws Exception {
-		final JobID jid = new JobID();
-		
-		final SlotPool pool = new SlotPool(
-			rpcService,
-			jid,
-			SystemClock.getInstance(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
-			Time.milliseconds(10L) // this is the timeout for the request tested here
-		);
-
-		try {
-			pool.start(JobMasterId.generate(), "foobar");
-
-			CompletableFuture<LogicalSlot> future = pool.allocateSlot(
-				new SlotRequestID(),
-				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-				DEFAULT_TESTING_PROFILE,
-				Collections.emptyList(),
-				TestingUtils.infiniteTime());
-
-			try {
-				future.get();
-				fail("We expected an ExecutionException.");
-			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof NoResourceAvailableException);
-			}
-		} finally {
-			RpcUtils.terminateRpcEndpoint(pool, timeout);
-		}
-	}
-
-	@Test
-	public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
-		final JobID jid = new JobID();
-
-		final TestingSlotPool pool = new TestingSlotPool(
-			rpcService,
-			jid,
-			SystemClock.getInstance(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime());
-
-		try {
-			pool.start(JobMasterId.generate(), "foobar");
-			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
-			SlotRequestID requestId = new SlotRequestID();
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
-				requestId,
-				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-				DEFAULT_TESTING_PROFILE,
-				Collections.emptyList(),
-				Time.milliseconds(10L));
-
-			try {
-				future.get();
-				fail("We expected a AskTimeoutException.");
-			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
-			}
-
-			assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
-
-			slotPoolGateway.cancelSlotRequest(requestId).get();
-
-			assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
-		} finally {
-			RpcUtils.terminateRpcEndpoint(pool, timeout);
-		}
-	}
-
-	@Test
-	public void testCancelSlotAllocationWithResourceManager() throws Exception {
-		final JobID jid = new JobID();
-
-		final TestingSlotPool pool = new TestingSlotPool(
-			rpcService,
-			jid,
-			SystemClock.getInstance(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime());
-
-		try {
-			pool.start(JobMasterId.generate(), "foobar");
-			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
-			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-			pool.connectToResourceManager(resourceManagerGateway);
-
-			SlotRequestID requestId = new SlotRequestID();
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
-				requestId,
-				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-				DEFAULT_TESTING_PROFILE,
-				Collections.emptyList(),
-				Time.milliseconds(10L));
-
-			try {
-				future.get();
-				fail("We expected a AskTimeoutException.");
-			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
-			}
-
-			assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
-
-			slotPoolGateway.cancelSlotRequest(requestId).get();
-			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
-		} finally {
-			RpcUtils.terminateRpcEndpoint(pool, timeout);
-		}
-	}
-
-	/**
-	 * Tests that allocated slots are not cancelled.
-	 */
-	@Test
-	public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
-		final JobID jid = new JobID();
-
-		final TestingSlotPool pool = new TestingSlotPool(
-			rpcService,
-			jid,
-			SystemClock.getInstance(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime());
-
-		try {
-			pool.start(JobMasterId.generate(), "foobar");
-			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
-
-			final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
-
-			TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-			resourceManagerGateway.setRequestSlotConsumer(
-				(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
-
-			pool.connectToResourceManager(resourceManagerGateway);
-
-			SlotRequestID requestId = new SlotRequestID();
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
-				requestId,
-				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
-				DEFAULT_TESTING_PROFILE,
-				Collections.emptyList(),
-				Time.milliseconds(10L));
-
-			try {
-				future.get();
-				fail("We expected a AskTimeoutException.");
-			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
-			}
-
-			AllocationID allocationId = allocationIdFuture.get();
-			final SlotOffer slotOffer = new SlotOffer(
-				allocationId,
-				0,
-				DEFAULT_TESTING_PROFILE);
-			final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-			final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
-
-			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
-
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
-
-			assertTrue(pool.containsAllocatedSlot(allocationId).get());
-
-			pool.cancelSlotRequest(requestId).get();
-
-			assertFalse(pool.containsAllocatedSlot(allocationId).get());
-			assertTrue(pool.containsAvailableSlot(allocationId).get());
-		} finally {
-			RpcUtils.terminateRpcEndpoint(pool, timeout);
-		}
-	}
-
-	/**
-	 * This case make sure when allocateSlot in ProviderAndOwner timeout,
-	 * it will automatically call cancelSlotAllocation as will inject future.whenComplete in ProviderAndOwner.
-	 */
-	@Test
-	public void testProviderAndOwner() throws Exception {
-		final JobID jid = new JobID();
-
-		final TestingSlotPool pool = new TestingSlotPool(
-			rpcService,
-			jid,
-			SystemClock.getInstance(),
-			Time.milliseconds(10L),
-			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime());
-
-		final CompletableFuture<SlotRequestID> cancelFuture = new CompletableFuture<>();
-
-		pool.setCancelSlotAllocationConsumer(
-			slotRequestID -> cancelFuture.complete(slotRequestID));
-
-		try {
-			pool.start(JobMasterId.generate(), "foobar");
-			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-			pool.connectToResourceManager(resourceManagerGateway);
-
-			ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
-
-			// test the pending request is clear when timed out
-			CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
-				mockScheduledUnit,
-				true,
-				Collections.emptyList());
-
-			try {
-				future.get();
-				fail("We expected a AskTimeoutException.");
-			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
-			}
-
-			// wait for the cancel call on the SlotPool
-			cancelFuture.get();
-
-			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
-		} finally {
-			RpcUtils.terminateRpcEndpoint(pool, timeout);
-		}
-	}
-
-	/**
-	 * Testing SlotPool which exposes internal state via some testing methods.
-	 */
-	private static final class TestingSlotPool extends SlotPool {
-
-		private volatile Consumer<SlotRequestID> cancelSlotAllocationConsumer;
-
-		public TestingSlotPool(
-				RpcService rpcService,
-				JobID jobId,
-				Clock clock,
-				Time slotRequestTimeout,
-				Time resourceManagerAllocationTimeout,
-				Time resourceManagerRequestTimeout) {
-			super(
-				rpcService,
-				jobId,
-				clock,
-				slotRequestTimeout,
-				resourceManagerAllocationTimeout,
-				resourceManagerRequestTimeout);
-
-			cancelSlotAllocationConsumer = null;
-		}
-
-		public void setCancelSlotAllocationConsumer(Consumer<SlotRequestID> cancelSlotAllocationConsumer) {
-			this.cancelSlotAllocationConsumer = Preconditions.checkNotNull(cancelSlotAllocationConsumer);
-		}
-
-		@Override
-		public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
-			final Consumer<SlotRequestID> currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer;
-
-			if (currentCancelSlotAllocationConsumer != null) {
-				currentCancelSlotAllocationConsumer.accept(slotRequestId);
-			}
-
-			return super.cancelSlotRequest(slotRequestId);
-		}
-
-		CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
-			return callAsync(
-				() -> getAllocatedSlots().contains(allocationId),
-				timeout);
-		}
-
-		CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
-			return callAsync(
-				() -> getAvailableSlots().contains(allocationId),
-				timeout);
-		}
-
-		CompletableFuture<Integer> getNumberOfPendingRequests() {
-			return callAsync(
-				() -> getPendingRequests().size(),
-				timeout);
-		}
-
-		CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
-			return callAsync(
-				() -> getWaitingForResourceManager().size(),
-				timeout);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
deleted file mode 100644
index 1af9cce..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_MOCKS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class SlotPoolTest extends TestLogger {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);
-
-	private final Time timeout = Time.seconds(10L);
-
-	private RpcService rpcService;
-
-	private JobID jobId;
-
-	private TaskManagerLocation taskManagerLocation;
-
-	private TaskManagerGateway taskManagerGateway;
-
-	@Before
-	public void setUp() throws Exception {
-		this.rpcService = new TestingRpcService();
-		this.jobId = new JobID();
-
-		taskManagerLocation = new LocalTaskManagerLocation();
-		taskManagerGateway = new SimpleAckingTaskManagerGateway();
-	}
-
-	@After
-	public void tearDown() throws Exception {
-		rpcService.stopService();
-	}
-
-	@Test
-	public void testAllocateSimpleSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-		try {
-			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
-			SlotRequestID requestId = new SlotRequestID();
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			assertFalse(future.isDone());
-
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
-			final SlotOffer slotOffer = new SlotOffer(
-				slotRequest.getAllocationId(),
-				0,
-				DEFAULT_TESTING_PROFILE);
-
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
-			assertTrue(future.isDone());
-			assertTrue(slot.isAlive());
-			assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
-		} finally {
-			slotPool.shutDown();
-		}
-	}
-
-	@Test
-	public void testAllocationFulfilledByReturnedSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-		try {
-			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			slotPool.registerTaskManager(taskManagerLocation.getResourceID());
-
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
-			assertFalse(future1.isDone());
-			assertFalse(future2.isDone());
-
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2))
-				.requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
-
-			final SlotOffer slotOffer = new SlotOffer(
-				slotRequests.get(0).getAllocationId(),
-				0,
-				DEFAULT_TESTING_PROFILE);
-
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-			assertTrue(future1.isDone());
-			assertFalse(future2.isDone());
-
-			// return this slot to pool
-			slot1.releaseSlot();
-
-			// second allocation fulfilled by previous slot returning
-			LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-			assertTrue(future2.isDone());
-
-			assertNotEquals(slot1, slot2);
-			assertFalse(slot1.isAlive());
-			assertTrue(slot2.isAlive());
-			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
-			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
-			assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
-		} finally {
-			slotPool.shutDown();
-		}
-	}
-
-	@Test
-	public void testAllocateWithFreeSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-		try {
-			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			assertFalse(future1.isDone());
-
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
-			final SlotOffer slotOffer = new SlotOffer(
-				slotRequest.getAllocationId(),
-				0,
-				DEFAULT_TESTING_PROFILE);
-
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-			assertTrue(future1.isDone());
-
-			// return this slot to pool
-			slot1.releaseSlot();
-
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
-			// second allocation fulfilled by previous slot returning
-			LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-			assertTrue(future2.isDone());
-
-			assertNotEquals(slot1, slot2);
-			assertFalse(slot1.isAlive());
-			assertTrue(slot2.isAlive());
-			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
-			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
-		} finally {
-			slotPool.shutDown();
-		}
-	}
-
-	@Test
-	public void testOfferSlot() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-		try {
-			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			assertFalse(future.isDone());
-
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
-			final SlotOffer slotOffer = new SlotOffer(
-				slotRequest.getAllocationId(),
-				0,
-				DEFAULT_TESTING_PROFILE);
-
-			final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
-
-			// slot from unregistered resource
-			assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			final SlotOffer nonRequestedSlotOffer = new SlotOffer(
-				new AllocationID(),
-				0,
-				DEFAULT_TESTING_PROFILE);
-
-			// we'll also accept non requested slots
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get());
-
-			// accepted slot
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-			LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-			assertTrue(slot.isAlive());
-
-			// duplicated offer with using slot
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-			assertTrue(slot.isAlive());
-
-			// duplicated offer with free slot
-			slot.releaseSlot();
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-		} finally {
-			slotPool.shutDown();
-		}
-	}
-
-	@Test
-	public void testReleaseResource() throws Exception {
-		ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
-
-		final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
-
-		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
-			@Override
-			public void returnAllocatedSlot(SlotRequestID slotRequestId) {
-				super.returnAllocatedSlot(slotRequestId);
-
-				slotReturnFuture.complete(true);
-			}
-		};
-
-		try {
-			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
-			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
-
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
-			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
-			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
-			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
-
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-
-			final SlotOffer slotOffer = new SlotOffer(
-				slotRequest.getAllocationId(),
-				0,
-				DEFAULT_TESTING_PROFILE);
-
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-			assertTrue(future1.isDone());
-			assertFalse(future2.isDone());
-
-			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
-
-			// wait until the slot has been returned
-			slotReturnFuture.get();
-
-			assertFalse(slot1.isAlive());
-
-			// slot released and not usable, second allocation still not fulfilled
-			Thread.sleep(10);
-			assertFalse(future2.isDone());
-		} finally {
-			slotPool.shutDown();
-		}
-	}
-
-	/**
-	 * Tests that a slot request is cancelled if it failed with an exception (e.g. TimeoutException).
-	 *
-	 * <p>See FLINK-7870
-	 */
-	@Test
-	public void testSlotRequestCancellationUponFailingRequest() throws Exception {
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-		final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>();
-		final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>();
-		final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>();
-
-		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-		resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
-		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
-		resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID));
-
-		final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
-
-		try {
-			slotPool.start(JobMasterId.generate(), "localhost");
-
-			final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
-
-			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
-
-			CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
-				new SlotRequestID(),
-				scheduledUnit,
-				ResourceProfile.UNKNOWN,
-				Collections.emptyList(),
-				timeout);
-
-			requestSlotFuture.completeExceptionally(new FlinkException("Testing exception."));
-
-			try {
-				slotFuture.get();
-				fail("The slot future should not have been completed properly.");
-			} catch (Exception ignored) {
-				// expected
-			}
-
-			// check that a failure triggered the slot request cancellation
-			// with the correct allocation id
-			assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
-		} finally {
-			try {
-				RpcUtils.terminateRpcEndpoint(slotPool, timeout);
-			} catch (Exception e) {
-				LOG.warn("Could not properly terminate the SlotPool.", e);
-			}
-		}
-	}
-
-	/**
-	 * Tests that unused offered slots are directly used to fulfill pending slot
-	 * requests.
-	 *
-	 * <p>See FLINK-8089
-	 */
-	@Test
-	public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-		final JobMasterId jobMasterId = JobMasterId.generate();
-		final String jobMasterAddress = "foobar";
-		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
-		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
-
-		resourceManagerGateway.setRequestSlotConsumer(
-			(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
-
-		final SlotRequestID slotRequestId1 = new SlotRequestID();
-		final SlotRequestID slotRequestId2 = new SlotRequestID();
-
-		try {
-			slotPool.start(jobMasterId, jobMasterAddress);
-
-			final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
-
-			final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
-
-			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
-
-			CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot(
-				slotRequestId1,
-				scheduledUnit,
-				ResourceProfile.UNKNOWN,
-				Collections.emptyList(),
-				timeout);
-
-			// wait for the first slot request
-			final AllocationID allocationId = allocationIdFuture.get();
-
-			CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot(
-				slotRequestId2,
-				scheduledUnit,
-				ResourceProfile.UNKNOWN,
-				Collections.emptyList(),
-				timeout);
-
-			slotPoolGateway.cancelSlotRequest(slotRequestId1);
-
-			try {
-				// this should fail with a CancellationException
-				slotFuture1.get();
-				fail("The first slot future should have failed because it was cancelled.");
-			} catch (ExecutionException ee) {
-				assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException);
-			}
-
-			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
-
-			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
-
-			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
-
-			// the slot offer should fulfill the second slot request
-			assertEquals(allocationId, slotFuture2.get().getAllocationId());
-		} finally {
-			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
-		}
-	}
-
-	private static ResourceManagerGateway createResourceManagerGatewayMock() {
-		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
-		when(resourceManagerGateway
-			.requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
-
-		return resourceManagerGateway;
-	}
-
-	private static SlotPoolGateway setupSlotPool(
-			SlotPool slotPool,
-			ResourceManagerGateway resourceManagerGateway) throws Exception {
-		final String jobManagerAddress = "foobar";
-
-		slotPool.start(JobMasterId.generate(), jobManagerAddress);
-
-		slotPool.connectToResourceManager(resourceManagerGateway);
-
-		return slotPool.getSelfGateway(SlotPoolGateway.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
index 28cab72..2407c1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
deleted file mode 100644
index 2066017..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Simple testing logical slot for testing purposes.
- */
-public class TestingLogicalSlot implements LogicalSlot {
-
-	private final TaskManagerLocation taskManagerLocation;
-
-	private final TaskManagerGateway taskManagerGateway;
-
-	private final AtomicReference<Payload> payloadReference;
-
-	private final int slotNumber;
-
-	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
-	
-	private final AllocationID allocationId;
-
-	private final SlotRequestID slotRequestId;
-
-	public TestingLogicalSlot() {
-		this(
-			new LocalTaskManagerLocation(),
-			new SimpleAckingTaskManagerGateway(),
-			0,
-			new AllocationID(),
-			new SlotRequestID());
-	}
-
-	public TestingLogicalSlot(
-			TaskManagerLocation taskManagerLocation,
-			TaskManagerGateway taskManagerGateway,
-			int slotNumber,
-			AllocationID allocationId,
-			SlotRequestID slotRequestId) {
-		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
-		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
-		this.payloadReference = new AtomicReference<>();
-		this.slotNumber = slotNumber;
-		this.allocationId = Preconditions.checkNotNull(allocationId);
-		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
-	}
-
-	@Override
-	public TaskManagerLocation getTaskManagerLocation() {
-		return taskManagerLocation;
-	}
-
-	@Override
-	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
-	}
-
-	@Override
-	public boolean isAlive() {
-		return !releaseFuture.isDone();
-	}
-
-	@Override
-	public boolean tryAssignPayload(Payload payload) {
-		return payloadReference.compareAndSet(null, payload);
-	}
-
-	@Nullable
-	@Override
-	public Payload getPayload() {
-		return payloadReference.get();
-	}
-
-	@Override
-	public CompletableFuture<?> releaseSlot() {
-		releaseFuture.complete(null);
-
-		return releaseFuture;
-	}
-
-	@Override
-	public int getPhysicalSlotNumber() {
-		return slotNumber;
-	}
-
-	@Override
-	public AllocationID getAllocationId() {
-		return allocationId;
-	}
-
-	@Override
-	public SlotRequestID getSlotRequestId() {
-		return slotRequestId;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
deleted file mode 100644
index 3369882..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Simple payload implementation for testing purposes.
- */
-public class TestingPayload implements LogicalSlot.Payload {
-
-	private final CompletableFuture<?> terminationFuture;
-
-	public TestingPayload() {
-		this.terminationFuture = new CompletableFuture<>();
-	}
-
-
-	@Override
-	public void fail(Throwable cause) {
-		terminationFuture.complete(null);
-	}
-
-	@Override
-	public CompletableFuture<?> getTerminalStateFuture() {
-		return terminationFuture;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index d40ff61..77d162f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -143,7 +143,7 @@ public class CoLocationConstraintTest {
 			assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
 			
 			// release the slot
-			slot2_1.releaseInstanceSlot();
+			slot2_1.releaseSlot();
 
 			// we should still have a location
 			assertTrue(constraint.isAssigned());