You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/26 11:17:36 UTC
[flink] branch master updated: [FLINK-12179][coordination] Remove
legacy class Instance
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e6e6898 [FLINK-12179][coordination] Remove legacy class Instance
e6e6898 is described below
commit e6e6898975a892cdfed3e69cd6da0474cd712714
Author: tison <wa...@gmail.com>
AuthorDate: Fri Jul 26 19:17:04 2019 +0800
[FLINK-12179][coordination] Remove legacy class Instance
---
.../apache/flink/runtime/instance/Instance.java | 391 ---------------------
.../runtime/instance/InstanceDiedException.java | 37 --
.../instance/SlotSharingGroupAssignment.java | 2 +-
.../jobmanager/scheduler/CoLocationConstraint.java | 3 +-
.../scheduler/SlotAvailabilityListener.java | 30 --
.../flink/runtime/instance/InstanceTest.java | 194 ----------
.../scheduler/SchedulerIsolatedTasksTest.java | 4 +-
.../jobmanager/scheduler/SchedulerTestUtils.java | 10 -
8 files changed, 4 insertions(+), 667 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
deleted file mode 100644
index 6222859..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An instance represents a {@code org.apache.flink.runtime.taskmanager.TaskManager}
- * registered at a JobManager and ready to receive work.
- */
-public class Instance implements SlotOwner {
-
- private final static Logger LOG = LoggerFactory.getLogger(Instance.class);
-
- /** The lock on which to synchronize allocations and failure state changes */
- private final Object instanceLock = new Object();
-
- /** The instance gateway to communicate with the instance */
- private final TaskManagerGateway taskManagerGateway;
-
- /** The instance connection information for the data transfer. */
- private final TaskManagerLocation location;
-
- /** A description of the resources of the task manager */
- private final HardwareDescription resources;
-
- /** The ID identifying the taskManager. */
- private final InstanceID instanceId;
-
- /** The number of task slots available on the node */
- private final int numberOfSlots;
-
- /** A list of available slot positions */
- private final Queue<Integer> availableSlots;
-
- /** Allocated slots on this taskManager */
- private final Set<Slot> allocatedSlots = new HashSet<Slot>();
-
- /** A listener to be notified upon new slot availability */
- private SlotAvailabilityListener slotAvailabilityListener;
-
- /** Time when last heat beat has been received from the task manager running on this taskManager. */
- private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
-
- /** Flag marking the instance as alive or as dead. */
- private volatile boolean isDead;
-
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Constructs an instance reflecting a registered TaskManager.
- *
- * @param taskManagerGateway The actor gateway to communicate with the remote instance
- * @param location The remote connection where the task manager receives requests.
- * @param id The id under which the taskManager is registered.
- * @param resources The resources available on the machine.
- * @param numberOfSlots The number of task slots offered by this taskManager.
- */
- public Instance(
- TaskManagerGateway taskManagerGateway,
- TaskManagerLocation location,
- InstanceID id,
- HardwareDescription resources,
- int numberOfSlots) {
- this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
- this.location = Preconditions.checkNotNull(location);
- this.instanceId = Preconditions.checkNotNull(id);
- this.resources = Preconditions.checkNotNull(resources);
- this.numberOfSlots = numberOfSlots;
-
- this.availableSlots = new ArrayDeque<>(numberOfSlots);
- for (int i = 0; i < numberOfSlots; i++) {
- this.availableSlots.add(i);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Properties
- // --------------------------------------------------------------------------------------------
-
- public ResourceID getTaskManagerID() {
- return location.getResourceID();
- }
-
- public InstanceID getId() {
- return instanceId;
- }
-
- public HardwareDescription getResources() {
- return this.resources;
- }
-
- public int getTotalNumberOfSlots() {
- return numberOfSlots;
- }
-
- // --------------------------------------------------------------------------------------------
- // Life and Death
- // --------------------------------------------------------------------------------------------
-
- public boolean isAlive() {
- return !isDead;
- }
-
- public void markDead() {
-
- // create a copy of the slots to avoid concurrent modification exceptions
- List<Slot> slots;
-
- synchronized (instanceLock) {
- if (isDead) {
- return;
- }
- isDead = true;
-
- // no more notifications for the slot releasing
- this.slotAvailabilityListener = null;
-
- slots = new ArrayList<Slot>(allocatedSlots);
-
- allocatedSlots.clear();
- availableSlots.clear();
- }
-
- /*
- * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot
- * owning the assignment group lock wants to give itself back to the instance which requires
- * the instance lock
- */
- final FlinkException cause = new FlinkException("Instance " + this + " has been marked as dead.");
- for (Slot slot : slots) {
- slot.releaseSlot(cause);
- }
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Heartbeats
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the timestamp of the last heartbeat.
- *
- * @return The timestamp of the last heartbeat.
- */
- public long getLastHeartBeat() {
- return this.lastReceivedHeartBeat;
- }
-
- /**
- * Updates the time of last received heart beat to the current system time.
- */
- public void reportHeartBeat() {
- this.lastReceivedHeartBeat = System.currentTimeMillis();
- }
-
- /**
- * Checks whether the last heartbeat occurred within the last {@code n} milliseconds
- * before the given timestamp {@code now}.
- *
- * @param now The timestamp representing the current time.
- * @param cleanUpInterval The maximum time (in msecs) that the last heartbeat may lie in the past.
- * @return True, if this taskManager is considered alive, false otherwise.
- */
- public boolean isStillAlive(long now, long cleanUpInterval) {
- return this.lastReceivedHeartBeat + cleanUpInterval > now;
- }
-
- // --------------------------------------------------------------------------------------------
- // Resource allocation
- // --------------------------------------------------------------------------------------------
-
- /**
- * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
- * is available at the moment.
- *
- * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
- * TaskManager instance has no more slots available.
- *
- * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
- * slot is allocated.
- */
- public SimpleSlot allocateSimpleSlot() throws InstanceDiedException {
- synchronized (instanceLock) {
- if (isDead) {
- throw new InstanceDiedException(this);
- }
-
- Integer nextSlot = availableSlots.poll();
- if (nextSlot == null) {
- return null;
- }
- else {
- SimpleSlot slot = new SimpleSlot(this, location, nextSlot, taskManagerGateway);
- allocatedSlots.add(slot);
- return slot;
- }
- }
- }
-
- /**
- * Allocates a shared slot on this TaskManager instance. This method returns {@code null}, if no slot
- * is available at the moment. The shared slot will be managed by the given SlotSharingGroupAssignment.
- *
- * @param sharingGroupAssignment The assignment group that manages this shared slot.
- *
- * @return A shared slot that represents a task slot on this TaskManager instance and can hold other
- * (shared) slots, or null, if the TaskManager instance has no more slots available.
- *
- * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the slot is allocated.
- */
- public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssignment)
- throws InstanceDiedException {
-
- synchronized (instanceLock) {
- if (isDead) {
- throw new InstanceDiedException(this);
- }
-
- Integer nextSlot = availableSlots.poll();
- if (nextSlot == null) {
- return null;
- }
- else {
- SharedSlot slot = new SharedSlot(
- this,
- location,
- nextSlot,
- taskManagerGateway,
- sharingGroupAssignment);
- allocatedSlots.add(slot);
- return slot;
- }
- }
- }
-
- /**
- * Returns a slot that has been allocated from this instance. The slot needs have been canceled
- * prior to calling this method.
- *
- * <p>The method will transition the slot to the "released" state. If the slot is already in state
- * "released", this method will do nothing.</p>
- *
- * @param logicalSlot The slot to return.
- * @return Future which is completed with true, if the slot was returned, false if not.
- */
- @Override
- public void returnLogicalSlot(LogicalSlot logicalSlot) {
- checkNotNull(logicalSlot);
- checkArgument(logicalSlot instanceof Slot);
-
- final Slot slot = ((Slot) logicalSlot);
- checkArgument(!slot.isAlive(), "slot is still alive");
- checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
-
- if (slot.markReleased()) {
- LOG.debug("Return allocated slot {}.", slot);
- synchronized (instanceLock) {
- if (isDead) {
- return;
- }
-
- if (this.allocatedSlots.remove(slot)) {
- this.availableSlots.add(slot.getSlotNumber());
-
- if (this.slotAvailabilityListener != null) {
- this.slotAvailabilityListener.newSlotAvailable(this);
- }
-
- }
- else {
- throw new IllegalArgumentException("Slot was not allocated from this TaskManager.");
- }
- }
- }
- }
-
- public void cancelAndReleaseAllSlots() {
- // we need to do this copy because of concurrent modification exceptions
- List<Slot> copy;
- synchronized (instanceLock) {
- copy = new ArrayList<Slot>(this.allocatedSlots);
- }
-
- final FlinkException cause = new FlinkException("Cancel and release all slots of instance " + this + '.');
- for (Slot slot : copy) {
- slot.releaseSlot(cause);
- }
- }
-
- /**
- * Returns the InstanceGateway of this Instance. This gateway can be used to communicate with
- * it.
- *
- * @return InstanceGateway associated with this instance
- */
- public TaskManagerGateway getTaskManagerGateway() {
- return taskManagerGateway;
- }
-
- public TaskManagerLocation getTaskManagerLocation() {
- return location;
- }
-
- public int getNumberOfAvailableSlots() {
- return this.availableSlots.size();
- }
-
- public int getNumberOfAllocatedSlots() {
- return this.allocatedSlots.size();
- }
-
- public boolean hasResourcesAvailable() {
- return !isDead && getNumberOfAvailableSlots() > 0;
- }
-
- // --------------------------------------------------------------------------------------------
- // Listeners
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets the listener that receives notifications for slot availability.
- *
- * @param slotAvailabilityListener The listener.
- */
- public void setSlotAvailabilityListener(SlotAvailabilityListener slotAvailabilityListener) {
- synchronized (instanceLock) {
- if (this.slotAvailabilityListener != null) {
- throw new IllegalStateException("Instance has already a slot listener.");
- } else {
- this.slotAvailabilityListener = slotAvailabilityListener;
- }
- }
- }
-
- /**
- * Removes the listener that receives notifications for slot availability.
- */
- public void removeSlotListener() {
- synchronized (instanceLock) {
- this.slotAvailabilityListener = null;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Standard Utilities
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
- numberOfSlots, (taskManagerGateway != null ? taskManagerGateway.getAddress() : "No instance gateway"));
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
deleted file mode 100644
index bdcbbd5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-/**
- * A special instance signaling that an attempted operation on an instance is not possible,
- * because the instance has died.
- */
-public class InstanceDiedException extends Exception {
- private static final long serialVersionUID = -4917918318403135745L;
-
- private final Instance instance;
-
- public InstanceDiedException(Instance instance) {
- this.instance = instance;
- }
-
- public Instance getInstance() {
- return instance;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 290ccc5..d16c332 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -106,7 +106,7 @@ public class SlotSharingGroupAssignment {
/**
* Gets the number of slots that are currently governed by this assignment group.
- * This refers to the slots allocated from an {@link org.apache.flink.runtime.instance.Instance},
+ * This refers to the slots allocated from an Instance,
* and not the sub-slots given out as children of those shared slots.
*
* @return The number of resource slots managed by this assignment group.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 8750749..0e9f585 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
@@ -37,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* A CoLocationConstraint manages the location of a set of tasks
* (Execution Vertices). In co-location groups, the different subtasks of
- * different JobVertices need to be executed on the same {@link Instance}.
+ * different JobVertices need to be executed on the same slot.
* This is realized by creating a special shared slot that holds these tasks.
*
* <p>This class tracks the location and the shared slot for this set of tasks.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
deleted file mode 100644
index 0f02748..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.Instance;
-
-/**
- * A SlotAvailabilityListener can be notified when new
- * {@link org.apache.flink.runtime.instance.Slot}s become available on an {@link Instance}.
- */
-public interface SlotAvailabilityListener {
-
- void newSlotAvailable(Instance instance);
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
deleted file mode 100644
index 097cab5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link Instance} class.
- */
-public class InstanceTest {
-
- @Test
- public void testAllocatingAndCancellingSlots() {
- try {
- ResourceID resourceID = ResourceID.generate();
- HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
- InetAddress address = InetAddress.getByName("127.0.0.1");
- TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
-
- Instance instance = new Instance(
- new SimpleAckingTaskManagerGateway(),
- connection,
- new InstanceID(),
- hardwareDescription,
- 4);
-
- assertEquals(4, instance.getTotalNumberOfSlots());
- assertEquals(4, instance.getNumberOfAvailableSlots());
- assertEquals(0, instance.getNumberOfAllocatedSlots());
-
- SimpleSlot slot1 = instance.allocateSimpleSlot();
- SimpleSlot slot2 = instance.allocateSimpleSlot();
- SimpleSlot slot3 = instance.allocateSimpleSlot();
- SimpleSlot slot4 = instance.allocateSimpleSlot();
-
- assertNotNull(slot1);
- assertNotNull(slot2);
- assertNotNull(slot3);
- assertNotNull(slot4);
-
- assertEquals(0, instance.getNumberOfAvailableSlots());
- assertEquals(4, instance.getNumberOfAllocatedSlots());
- assertEquals(6, slot1.getSlotNumber() + slot2.getSlotNumber() +
- slot3.getSlotNumber() + slot4.getSlotNumber());
-
- // no more slots
- assertNull(instance.allocateSimpleSlot());
- try {
- instance.returnLogicalSlot(slot2);
- fail("instance accepted a non-cancelled slot.");
- }
- catch (IllegalArgumentException e) {
- // good
- }
-
- // release the slots. this returns them to the instance
- slot1.releaseSlot();
- slot2.releaseSlot();
- slot3.releaseSlot();
- slot4.releaseSlot();
-
- assertEquals(4, instance.getNumberOfAvailableSlots());
- assertEquals(0, instance.getNumberOfAllocatedSlots());
-
- instance.returnLogicalSlot(slot1);
- instance.returnLogicalSlot(slot2);
- instance.returnLogicalSlot(slot3);
- instance.returnLogicalSlot(slot4);
-
- assertEquals(4, instance.getNumberOfAvailableSlots());
- assertEquals(0, instance.getNumberOfAllocatedSlots());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testInstanceDies() {
- try {
- ResourceID resourceID = ResourceID.generate();
- HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
- InetAddress address = InetAddress.getByName("127.0.0.1");
- TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
-
- Instance instance = new Instance(
- new SimpleAckingTaskManagerGateway(),
- connection,
- new InstanceID(),
- hardwareDescription,
- 3);
-
- assertEquals(3, instance.getNumberOfAvailableSlots());
-
- SimpleSlot slot1 = instance.allocateSimpleSlot();
- SimpleSlot slot2 = instance.allocateSimpleSlot();
- SimpleSlot slot3 = instance.allocateSimpleSlot();
-
- instance.markDead();
-
- assertEquals(0, instance.getNumberOfAllocatedSlots());
- assertEquals(0, instance.getNumberOfAvailableSlots());
-
- assertTrue(slot1.isCanceled());
- assertTrue(slot2.isCanceled());
- assertTrue(slot3.isCanceled());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCancelAllSlots() {
- try {
- ResourceID resourceID = ResourceID.generate();
- HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
- InetAddress address = InetAddress.getByName("127.0.0.1");
- TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
-
- Instance instance = new Instance(
- new SimpleAckingTaskManagerGateway(),
- connection,
- new InstanceID(),
- hardwareDescription,
- 3);
-
- assertEquals(3, instance.getNumberOfAvailableSlots());
-
- SimpleSlot slot1 = instance.allocateSimpleSlot();
- SimpleSlot slot2 = instance.allocateSimpleSlot();
- SimpleSlot slot3 = instance.allocateSimpleSlot();
-
- instance.cancelAndReleaseAllSlots();
-
- assertEquals(3, instance.getNumberOfAvailableSlots());
-
- assertTrue(slot1.isCanceled());
- assertTrue(slot2.isCanceled());
- assertTrue(slot3.isCanceled());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * It is crucial for some portions of the code that instance objects do not override equals and
- * are only considered equal, if the references are equal.
- */
- @Test
- public void testInstancesReferenceEqual() {
- try {
- Method m = Instance.class.getMethod("equals", Object.class);
- assertTrue(m.getDeclaringClass() == Object.class);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index ad2810f..75f9a70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -234,7 +234,7 @@ public class SchedulerIsolatedTasksTest extends SchedulerTestBase {
final TaskManagerLocation taskManagerLocation3 = testingSlotProvider.addTaskManager(2);
// schedule something on an arbitrary instance
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new LocalTaskManagerLocation())), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
// figure out how we use the location hints
ResourceID firstResourceId = s1.getTaskManagerLocation().getResourceID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 75cb9c8..32defe3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -31,7 +30,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,14 +59,6 @@ public class SchedulerTestUtils {
return execution;
}
- public static Execution getTestVertex(Instance... preferredInstances) {
- List<TaskManagerLocation> locations = new ArrayList<>(preferredInstances.length);
- for (Instance i : preferredInstances) {
- locations.add(i.getTaskManagerLocation());
- }
- return getTestVertex(locations);
- }
-
public static Execution getTestVertex(TaskManagerLocation... preferredLocations) {
return getTestVertex(Arrays.asList(preferredLocations));
}