You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/26 11:17:36 UTC

[flink] branch master updated: [FLINK-12179][coordination] Remove legacy class Instance

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e6e6898  [FLINK-12179][coordination] Remove legacy class Instance
e6e6898 is described below

commit e6e6898975a892cdfed3e69cd6da0474cd712714
Author: tison <wa...@gmail.com>
AuthorDate: Fri Jul 26 19:17:04 2019 +0800

    [FLINK-12179][coordination] Remove legacy class Instance
---
 .../apache/flink/runtime/instance/Instance.java    | 391 ---------------------
 .../runtime/instance/InstanceDiedException.java    |  37 --
 .../instance/SlotSharingGroupAssignment.java       |   2 +-
 .../jobmanager/scheduler/CoLocationConstraint.java |   3 +-
 .../scheduler/SlotAvailabilityListener.java        |  30 --
 .../flink/runtime/instance/InstanceTest.java       | 194 ----------
 .../scheduler/SchedulerIsolatedTasksTest.java      |   4 +-
 .../jobmanager/scheduler/SchedulerTestUtils.java   |  10 -
 8 files changed, 4 insertions(+), 667 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
deleted file mode 100644
index 6222859..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An instance represents a {@code org.apache.flink.runtime.taskmanager.TaskManager}
- * registered at a JobManager and ready to receive work.
- */
-public class Instance implements SlotOwner {
-
-	private final static Logger LOG = LoggerFactory.getLogger(Instance.class);
-
-	/** The lock on which to synchronize allocations and failure state changes */
-	private final Object instanceLock = new Object();
-
-	/** The instance gateway to communicate with the instance */
-	private final TaskManagerGateway taskManagerGateway;
-
-	/** The instance connection information for the data transfer. */
-	private final TaskManagerLocation location;
-
-	/** A description of the resources of the task manager */
-	private final HardwareDescription resources;
-
-	/** The ID identifying the taskManager. */
-	private final InstanceID instanceId;
-
-	/** The number of task slots available on the node */
-	private final int numberOfSlots;
-
-	/** A list of available slot positions */
-	private final Queue<Integer> availableSlots;
-
-	/** Allocated slots on this taskManager */
-	private final Set<Slot> allocatedSlots = new HashSet<Slot>();
-
-	/** A listener to be notified upon new slot availability */
-	private SlotAvailabilityListener slotAvailabilityListener;
-
-	/** Time when last heat beat has been received from the task manager running on this taskManager. */
-	private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
-
-	/** Flag marking the instance as alive or as dead. */
-	private volatile boolean isDead;
-
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Constructs an instance reflecting a registered TaskManager.
-	 *
-	 * @param taskManagerGateway The actor gateway to communicate with the remote instance
-	 * @param location The remote connection where the task manager receives requests.
-	 * @param id The id under which the taskManager is registered.
-	 * @param resources The resources available on the machine.
-	 * @param numberOfSlots The number of task slots offered by this taskManager.
-	 */
-	public Instance(
-			TaskManagerGateway taskManagerGateway,
-			TaskManagerLocation location,
-			InstanceID id,
-			HardwareDescription resources,
-			int numberOfSlots) {
-		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
-		this.location = Preconditions.checkNotNull(location);
-		this.instanceId = Preconditions.checkNotNull(id);
-		this.resources = Preconditions.checkNotNull(resources);
-		this.numberOfSlots = numberOfSlots;
-
-		this.availableSlots = new ArrayDeque<>(numberOfSlots);
-		for (int i = 0; i < numberOfSlots; i++) {
-			this.availableSlots.add(i);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Properties
-	// --------------------------------------------------------------------------------------------
-
-	public ResourceID getTaskManagerID() {
-		return location.getResourceID();
-	}
-
-	public InstanceID getId() {
-		return instanceId;
-	}
-
-	public HardwareDescription getResources() {
-		return this.resources;
-	}
-
-	public int getTotalNumberOfSlots() {
-		return numberOfSlots;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Life and Death
-	// --------------------------------------------------------------------------------------------
-
-	public boolean isAlive() {
-		return !isDead;
-	}
-
-	public void markDead() {
-
-		// create a copy of the slots to avoid concurrent modification exceptions
-		List<Slot> slots;
-
-		synchronized (instanceLock) {
-			if (isDead) {
-				return;
-			}
-			isDead = true;
-
-			// no more notifications for the slot releasing
-			this.slotAvailabilityListener = null;
-
-			slots = new ArrayList<Slot>(allocatedSlots);
-
-			allocatedSlots.clear();
-			availableSlots.clear();
-		}
-
-		/*
-		 * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot
-		 * owning the assignment group lock wants to give itself back to the instance which requires
-		 * the instance lock
-		 */
-		final FlinkException cause = new FlinkException("Instance " + this + " has been marked as dead.");
-		for (Slot slot : slots) {
-			slot.releaseSlot(cause);
-		}
-	}
-
-
-	// --------------------------------------------------------------------------------------------
-	// Heartbeats
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the timestamp of the last heartbeat.
-	 *
-	 * @return The timestamp of the last heartbeat.
-	 */
-	public long getLastHeartBeat() {
-		return this.lastReceivedHeartBeat;
-	}
-
-	/**
-	 * Updates the time of last received heart beat to the current system time.
-	 */
-	public void reportHeartBeat() {
-		this.lastReceivedHeartBeat = System.currentTimeMillis();
-	}
-
-	/**
-	 * Checks whether the last heartbeat occurred within the last {@code n} milliseconds
-	 * before the given timestamp {@code now}.
-	 *
-	 * @param now The timestamp representing the current time.
-	 * @param cleanUpInterval The maximum time (in msecs) that the last heartbeat may lie in the past.
-	 * @return True, if this taskManager is considered alive, false otherwise.
-	 */
-	public boolean isStillAlive(long now, long cleanUpInterval) {
-		return this.lastReceivedHeartBeat + cleanUpInterval > now;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Resource allocation
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
-	 * is available at the moment.
-	 *
-	 * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
-	 *         TaskManager instance has no more slots available.
-	 *
-	 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
-	 *                               slot is allocated. 
-	 */
-	public SimpleSlot allocateSimpleSlot() throws InstanceDiedException {
-		synchronized (instanceLock) {
-			if (isDead) {
-				throw new InstanceDiedException(this);
-			}
-
-			Integer nextSlot = availableSlots.poll();
-			if (nextSlot == null) {
-				return null;
-			}
-			else {
-				SimpleSlot slot = new SimpleSlot(this, location, nextSlot, taskManagerGateway);
-				allocatedSlots.add(slot);
-				return slot;
-			}
-		}
-	}
-
-	/**
-	 * Allocates a shared slot on this TaskManager instance. This method returns {@code null}, if no slot
-	 * is available at the moment. The shared slot will be managed by the given  SlotSharingGroupAssignment.
-	 *
-	 * @param sharingGroupAssignment The assignment group that manages this shared slot.
-	 *
-	 * @return A shared slot that represents a task slot on this TaskManager instance and can hold other
-	 *         (shared) slots, or null, if the TaskManager instance has no more slots available.
-	 *
-	 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the slot is allocated. 
-	 */
-	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssignment)
-			throws InstanceDiedException {
-
-		synchronized (instanceLock) {
-			if (isDead) {
-				throw new InstanceDiedException(this);
-			}
-
-			Integer nextSlot = availableSlots.poll();
-			if (nextSlot == null) {
-				return null;
-			}
-			else {
-				SharedSlot slot = new SharedSlot(
-					this,
-					location,
-					nextSlot,
-					taskManagerGateway,
-					sharingGroupAssignment);
-				allocatedSlots.add(slot);
-				return slot;
-			}
-		}
-	}
-
-	/**
-	 * Returns a slot that has been allocated from this instance. The slot needs have been canceled
-	 * prior to calling this method.
-	 * 
-	 * <p>The method will transition the slot to the "released" state. If the slot is already in state
-	 * "released", this method will do nothing.</p>
-	 * 
-	 * @param logicalSlot The slot to return.
-	 * @return Future which is completed with true, if the slot was returned, false if not.
-	 */
-	@Override
-	public void returnLogicalSlot(LogicalSlot logicalSlot) {
-		checkNotNull(logicalSlot);
-		checkArgument(logicalSlot instanceof Slot);
-
-		final Slot slot = ((Slot) logicalSlot);
-		checkArgument(!slot.isAlive(), "slot is still alive");
-		checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
-
-		if (slot.markReleased()) {
-			LOG.debug("Return allocated slot {}.", slot);
-			synchronized (instanceLock) {
-				if (isDead) {
-					return;
-				}
-
-				if (this.allocatedSlots.remove(slot)) {
-					this.availableSlots.add(slot.getSlotNumber());
-
-					if (this.slotAvailabilityListener != null) {
-						this.slotAvailabilityListener.newSlotAvailable(this);
-					}
-
-				}
-				else {
-					throw new IllegalArgumentException("Slot was not allocated from this TaskManager.");
-				}
-			}
-		}
-	}
-
-	public void cancelAndReleaseAllSlots() {
-		// we need to do this copy because of concurrent modification exceptions
-		List<Slot> copy;
-		synchronized (instanceLock) {
-			copy = new ArrayList<Slot>(this.allocatedSlots);
-		}
-
-		final FlinkException cause = new FlinkException("Cancel and release all slots of instance " + this + '.');
-		for (Slot slot : copy) {
-			slot.releaseSlot(cause);
-		}
-	}
-
-	/**
-	 * Returns the InstanceGateway of this Instance. This gateway can be used to communicate with
-	 * it.
-	 *
-	 * @return InstanceGateway associated with this instance
-	 */
-	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
-	}
-
-	public TaskManagerLocation getTaskManagerLocation() {
-		return location;
-	}
-
-	public int getNumberOfAvailableSlots() {
-		return this.availableSlots.size();
-	}
-
-	public int getNumberOfAllocatedSlots() {
-		return this.allocatedSlots.size();
-	}
-
-	public boolean hasResourcesAvailable() {
-		return !isDead && getNumberOfAvailableSlots() > 0;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Listeners
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Sets the listener that receives notifications for slot availability.
-	 * 
-	 * @param slotAvailabilityListener The listener.
-	 */
-	public void setSlotAvailabilityListener(SlotAvailabilityListener slotAvailabilityListener) {
-		synchronized (instanceLock) {
-			if (this.slotAvailabilityListener != null) {
-				throw new IllegalStateException("Instance has already a slot listener.");
-			} else {
-				this.slotAvailabilityListener = slotAvailabilityListener;
-			}
-		}
-	}
-
-	/**
-	 * Removes the listener that receives notifications for slot availability.
-	 */
-	public void removeSlotListener() {
-		synchronized (instanceLock) {
-			this.slotAvailabilityListener = null;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Standard Utilities
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
-				numberOfSlots, (taskManagerGateway != null ? taskManagerGateway.getAddress() : "No instance gateway"));
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
deleted file mode 100644
index bdcbbd5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-/**
- * A special instance signaling that an attempted operation on an instance is not possible,
- * because the instance has died.
- */
-public class InstanceDiedException extends Exception {
-	private static final long serialVersionUID = -4917918318403135745L;
-	
-	private final Instance instance;
-
-	public InstanceDiedException(Instance instance) {
-		this.instance = instance;
-	}
-	
-	public Instance getInstance() {
-		return instance;
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 290ccc5..d16c332 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -106,7 +106,7 @@ public class SlotSharingGroupAssignment {
 
 	/**
 	 * Gets the number of slots that are currently governed by this assignment group.
-	 * This refers to the slots allocated from an {@link org.apache.flink.runtime.instance.Instance},
+	 * This refers to the slots allocated from an Instance,
 	 * and not the sub-slots given out as children of those shared slots.
 	 * 
 	 * @return The number of resource slots managed by this assignment group.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 8750749..0e9f585 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
@@ -37,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A CoLocationConstraint manages the location of a set of tasks
  * (Execution Vertices). In co-location groups, the different subtasks of
- * different JobVertices need to be executed on the same {@link Instance}.
+ * different JobVertices need to be executed on the same slot.
  * This is realized by creating a special shared slot that holds these tasks.
  * 
  * <p>This class tracks the location and the shared slot for this set of tasks.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
deleted file mode 100644
index 0f02748..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.Instance;
-
-/**
- * A SlotAvailabilityListener can be notified when new
- * {@link org.apache.flink.runtime.instance.Slot}s become available on an {@link Instance}.
- */
-public interface SlotAvailabilityListener {
-
-	void newSlotAvailable(Instance instance);
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
deleted file mode 100644
index 097cab5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link Instance} class.
- */
-public class InstanceTest {
-
-	@Test
-	public void testAllocatingAndCancellingSlots() {
-		try {
-			ResourceID resourceID = ResourceID.generate();
-			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
-
-			Instance instance = new Instance(
-				new SimpleAckingTaskManagerGateway(),
-				connection,
-				new InstanceID(),
-				hardwareDescription,
-				4);
-
-			assertEquals(4, instance.getTotalNumberOfSlots());
-			assertEquals(4, instance.getNumberOfAvailableSlots());
-			assertEquals(0, instance.getNumberOfAllocatedSlots());
-
-			SimpleSlot slot1 = instance.allocateSimpleSlot();
-			SimpleSlot slot2 = instance.allocateSimpleSlot();
-			SimpleSlot slot3 = instance.allocateSimpleSlot();
-			SimpleSlot slot4 = instance.allocateSimpleSlot();
-
-			assertNotNull(slot1);
-			assertNotNull(slot2);
-			assertNotNull(slot3);
-			assertNotNull(slot4);
-
-			assertEquals(0, instance.getNumberOfAvailableSlots());
-			assertEquals(4, instance.getNumberOfAllocatedSlots());
-			assertEquals(6, slot1.getSlotNumber() + slot2.getSlotNumber() +
-					slot3.getSlotNumber() + slot4.getSlotNumber());
-
-			// no more slots
-			assertNull(instance.allocateSimpleSlot());
-			try {
-				instance.returnLogicalSlot(slot2);
-				fail("instance accepted a non-cancelled slot.");
-			}
-			catch (IllegalArgumentException e) {
-				// good
-			}
-
-			// release the slots. this returns them to the instance
-			slot1.releaseSlot();
-			slot2.releaseSlot();
-			slot3.releaseSlot();
-			slot4.releaseSlot();
-
-			assertEquals(4, instance.getNumberOfAvailableSlots());
-			assertEquals(0, instance.getNumberOfAllocatedSlots());
-
-			instance.returnLogicalSlot(slot1);
-			instance.returnLogicalSlot(slot2);
-			instance.returnLogicalSlot(slot3);
-			instance.returnLogicalSlot(slot4);
-
-			assertEquals(4, instance.getNumberOfAvailableSlots());
-			assertEquals(0, instance.getNumberOfAllocatedSlots());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testInstanceDies() {
-		try {
-			ResourceID resourceID = ResourceID.generate();
-			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
-
-			Instance instance = new Instance(
-				new SimpleAckingTaskManagerGateway(),
-				connection,
-				new InstanceID(),
-				hardwareDescription,
-				3);
-
-			assertEquals(3, instance.getNumberOfAvailableSlots());
-
-			SimpleSlot slot1 = instance.allocateSimpleSlot();
-			SimpleSlot slot2 = instance.allocateSimpleSlot();
-			SimpleSlot slot3 = instance.allocateSimpleSlot();
-
-			instance.markDead();
-
-			assertEquals(0, instance.getNumberOfAllocatedSlots());
-			assertEquals(0, instance.getNumberOfAvailableSlots());
-
-			assertTrue(slot1.isCanceled());
-			assertTrue(slot2.isCanceled());
-			assertTrue(slot3.isCanceled());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testCancelAllSlots() {
-		try {
-			ResourceID resourceID = ResourceID.generate();
-			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
-
-			Instance instance = new Instance(
-				new SimpleAckingTaskManagerGateway(),
-				connection,
-				new InstanceID(),
-				hardwareDescription,
-				3);
-
-			assertEquals(3, instance.getNumberOfAvailableSlots());
-
-			SimpleSlot slot1 = instance.allocateSimpleSlot();
-			SimpleSlot slot2 = instance.allocateSimpleSlot();
-			SimpleSlot slot3 = instance.allocateSimpleSlot();
-
-			instance.cancelAndReleaseAllSlots();
-
-			assertEquals(3, instance.getNumberOfAvailableSlots());
-
-			assertTrue(slot1.isCanceled());
-			assertTrue(slot2.isCanceled());
-			assertTrue(slot3.isCanceled());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * It is crucial for some portions of the code that instance objects do not override equals and
-	 * are only considered equal, if the references are equal.
-	 */
-	@Test
-	public void testInstancesReferenceEqual() {
-		try {
-			Method m = Instance.class.getMethod("equals", Object.class);
-			assertTrue(m.getDeclaringClass() == Object.class);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index ad2810f..75f9a70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
@@ -234,7 +234,7 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase {
 		final TaskManagerLocation taskManagerLocation3 = testingSlotProvider.addTaskManager(2);
 
 		// schedule something on an arbitrary instance
-		LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+		LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new LocalTaskManagerLocation())), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
 
 		// figure out how we use the location hints
 		ResourceID firstResourceId = s1.getTaskManagerLocation().getResourceID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 75cb9c8..32defe3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -31,7 +30,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -61,14 +59,6 @@ public class SchedulerTestUtils {
 		return execution;
 	}
 
-	public static Execution getTestVertex(Instance... preferredInstances) {
-		List<TaskManagerLocation> locations = new ArrayList<>(preferredInstances.length);
-		for (Instance i : preferredInstances) {
-			locations.add(i.getTaskManagerLocation());
-		}
-		return getTestVertex(locations);
-	}
-
 	public static Execution getTestVertex(TaskManagerLocation... preferredLocations) {
 		return getTestVertex(Arrays.asList(preferredLocations));
 	}