You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/05 13:14:30 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13508: [FLINK-19308][coordination] Add SlotTracker

tillrohrmann commented on a change in pull request #13508:
URL: https://github.com/apache/flink/pull/13508#discussion_r499586350



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -343,6 +344,7 @@ public void testRunningJobsRegistryCleanup() throws Exception {
 	 * before a new job with the same {@link JobID} is started.
 	 */
 	@Test
+	@Ignore

Review comment:
       Do we know why this test is untable?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Default SlotTracker implementation.
+ */
+class DefaultSlotTracker implements SlotTracker {
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultSlotTracker.class);
+
+	/**
+	 * Map for all registered slots.
+	 */
+	private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new HashMap<>();
+
+	/**
+	 * Index of all currently free slots.
+	 */
+	private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new LinkedHashMap<>();
+
+	private final SlotStatusUpdateListener slotStatusUpdateListener;
+
+	private final SlotStatusStateReconciler slotStatusStateReconciler = new SlotStatusStateReconciler(this::transitionSlotToFree, this::transitionSlotToPending, this::transitionSlotToAllocated);
+
+	public DefaultSlotTracker(SlotStatusUpdateListener slotStatusUpdateListener) {

Review comment:
       Is it because now we need to introduce a factory to introduce a custom tracker into the `SlotManager`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Default SlotTracker implementation.
+ */
+class DefaultSlotTracker implements SlotTracker {
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultSlotTracker.class);
+
+	/**
+	 * Map for all registered slots.
+	 */
+	private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new HashMap<>();
+
+	/**
+	 * Index of all currently free slots.
+	 */
+	private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new LinkedHashMap<>();
+
+	private final SlotStatusUpdateListener slotStatusUpdateListener;
+
+	private final SlotStatusStateReconciler slotStatusStateReconciler = new SlotStatusStateReconciler(this::transitionSlotToFree, this::transitionSlotToPending, this::transitionSlotToAllocated);
+
+	public DefaultSlotTracker(SlotStatusUpdateListener slotStatusUpdateListener) {
+		this.slotStatusUpdateListener = Preconditions.checkNotNull(slotStatusUpdateListener);
+	}
+
+	@Override
+	public void addSlot(
+		SlotID slotId,
+		ResourceProfile resourceProfile,
+		TaskExecutorConnection taskManagerConnection,
+		@Nullable JobID assignedJob) {
+		Preconditions.checkNotNull(slotId);
+		Preconditions.checkNotNull(resourceProfile);
+		Preconditions.checkNotNull(taskManagerConnection);
+
+		if (slots.containsKey(slotId)) {
+			// remove the old slot first
+			LOG.debug("A slot was added with an already tracked slot ID {}. Removing previous entry.", slotId);
+			removeSlot(slotId);
+		}
+
+		DeclarativeTaskManagerSlot slot = new DeclarativeTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
+		slots.put(slotId, slot);
+		freeSlots.put(slotId, slot);
+		slotStatusStateReconciler.executeStateTransition(slot, assignedJob);
+	}
+
+	@Override
+	public void removeSlots(Iterable<SlotID> slotsToRemove) {
+		Preconditions.checkNotNull(slotsToRemove);
+
+		for (SlotID slotId : slotsToRemove) {
+			removeSlot(slotId);
+		}
+	}
+
+	private void removeSlot(SlotID slotId) {
+		DeclarativeTaskManagerSlot slot = slots.remove(slotId);
+
+		if (slot != null) {
+			if (slot.getState() != SlotState.FREE) {
+				transitionSlotToFree(slot);
+			}
+			freeSlots.remove(slotId);
+		} else {
+			LOG.debug("There was no slot registered with slot id {}.", slotId);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// ResourceManager slot status API - optimistically trigger transitions, but they may not represent true state on task executors
+	// ---------------------------------------------------------------------------------------------
+
+	@Override
+	public void notifyFree(SlotID slotId) {
+		Preconditions.checkNotNull(slotId);
+		transitionSlotToFree(slots.get(slotId));
+	}
+
+	@Override
+	public void notifyAllocationStart(SlotID slotId, JobID jobId) {
+		Preconditions.checkNotNull(slotId);
+		Preconditions.checkNotNull(jobId);
+		transitionSlotToPending(slots.get(slotId), jobId);
+	}
+
+	@Override
+	public void notifyAllocationComplete(SlotID slotId, JobID jobId) {
+		Preconditions.checkNotNull(slotId);
+		Preconditions.checkNotNull(jobId);
+		transitionSlotToAllocated(slots.get(slotId), jobId);
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// TaskExecutor slot status API - acts as source of truth
+	// ---------------------------------------------------------------------------------------------
+
+	@Override
+	public void notifySlotStatus(Iterable<SlotStatus> slotStatuses) {
+		Preconditions.checkNotNull(slotStatuses);
+		for (SlotStatus slotStatus : slotStatuses) {
+			slotStatusStateReconciler.executeStateTransition(slots.get(slotStatus.getSlotID()), slotStatus.getJobID());
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// Core state transitions
+	// ---------------------------------------------------------------------------------------------
+
+	private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) {
+		Preconditions.checkNotNull(slot);
+		Preconditions.checkState(slot.getState() != SlotState.FREE);
+
+		// remember the slots current job and state for the notification, as this information will be cleared from
+		// the slot upon freeing
+		final JobID jobId = slot.getJobId();
+		final SlotState state = slot.getState();
+
+		slot.freeSlot();
+		freeSlots.put(slot.getSlotId(), slot);
+		slotStatusUpdateListener.notifySlotStatusChange(slot, state, SlotState.FREE, jobId);
+	}
+
+	private void transitionSlotToPending(DeclarativeTaskManagerSlot slot, JobID jobId) {
+		Preconditions.checkNotNull(slot);
+		Preconditions.checkState(slot.getState() == SlotState.FREE);
+
+		slot.startAllocation(jobId);
+		freeSlots.remove(slot.getSlotId());
+		slotStatusUpdateListener.notifySlotStatusChange(slot, SlotState.FREE, SlotState.PENDING, jobId);
+	}
+
+	private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, JobID jobId) {
+		Preconditions.checkNotNull(slot);
+		Preconditions.checkState(slot.getState() == SlotState.PENDING);
+		Preconditions.checkState(jobId.equals(slot.getJobId()));
+
+		slotStatusUpdateListener.notifySlotStatusChange(slot, SlotState.PENDING, SlotState.ALLOCATED, jobId);
+		slot.completeAllocation();

Review comment:
       Does it make a difference whether `slotStatusUpdateListener.notifySlotStatusChange` is called before effectuating the state change on `slot`? I am asking because `transitionSlotToPending` and `transitionSlotToFree` do it in the different order.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Queue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultSlotTracker}.
+ */
+public class DefaultSlotTrackerTest extends TestLogger {
+
+	private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = new TaskExecutorConnection(
+		ResourceID.generate(),
+		new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+
+	private static final JobID jobId = new JobID();
+
+	@Test
+	public void testInitialBehavior() {
+		SlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) -> {});
+
+		assertThat(tracker.getFreeSlots(), empty());
+	}
+
+	@Test
+	public void testSlotAddition() {
+		SlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) -> {});
+
+		SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+		SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+
+		tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+		tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+
+		assertThat(tracker.getFreeSlots(), containsInAnyOrder(Arrays.asList(infoWithSlotId(slotId1), infoWithSlotId(slotId2))));
+	}
+
+	@Test
+	public void testSlotRemoval() {
+		Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
+		DefaultSlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) ->
+			stateTransitions.add(new SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+		SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+		SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+		SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2);
+
+		tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+		tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+		tracker.addSlot(slotId3, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+
+		tracker.notifyAllocationStart(slotId2, jobId);
+		tracker.notifyAllocationStart(slotId3, jobId);
+		tracker.notifyAllocationComplete(slotId3, jobId);
+
+		// the transitions to this point are not relevant for this test
+		stateTransitions.clear();
+		// we now have 1 slot in each slot state (free, pending, allocated)
+		// it should be possible to remove slots regardless of their state
+		tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
+
+		assertThat(tracker.getFreeSlots(), empty());
+		assertThat(tracker.areMapsEmpty(), is(true));
+
+		assertThat(stateTransitions, containsInAnyOrder(
+			new SlotStateTransition(slotId2, SlotState.PENDING, SlotState.FREE, jobId),
+			new SlotStateTransition(slotId3, SlotState.ALLOCATED, SlotState.FREE, jobId)
+		));
+	}
+
+	@Test
+	public void testAllocationCompletion() {
+		Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
+		SlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) ->
+			stateTransitions.add(new SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+		SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+		tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+
+		tracker.notifyAllocationStart(slotId, jobId);
+		assertThat(tracker.getFreeSlots(), empty());
+		assertThat(stateTransitions.remove(), is(new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+
+		tracker.notifyAllocationComplete(slotId, jobId);
+		assertThat(tracker.getFreeSlots(), empty());
+		assertThat(stateTransitions.remove(), is(new SlotStateTransition(slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId)));
+
+		tracker.notifyFree(slotId);
+
+		assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
+		assertThat(stateTransitions.remove(), is(new SlotStateTransition(slotId, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+	}
+
+	@Test
+	public void testAllocationCompletionForDifferentJobThrowsIllegalStateException() {
+		SlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) -> {});
+
+		SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+		tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+
+		tracker.notifyAllocationStart(slotId, new JobID());
+		try {
+			tracker.notifyAllocationComplete(slotId, new JobID());
+			fail("Allocations must not be completed for a different job ID.");
+		} catch (IllegalStateException expected) {
+		}
+	}
+
+	@Test
+	public void testAllocationCancellation() {
+		Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
+		SlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) ->
+			stateTransitions.add(new SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+		SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+		tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+
+		tracker.notifyAllocationStart(slotId, jobId);
+		assertThat(tracker.getFreeSlots(), empty());
+		assertThat(stateTransitions.remove(), is(new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+
+		tracker.notifyFree(slotId);
+		assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
+		assertThat(stateTransitions.remove(), is(new SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId)));
+	}
+
+	@Test
+	public void testSlotStatusProcessing() {
+		SlotTracker tracker = new DefaultSlotTracker((slot, previous, current, jobId) -> {});
+		SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+		SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+		SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2);
+		tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+		tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
+		tracker.addSlot(slotId3, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, jobId);
+
+		assertThat(tracker.getFreeSlots(), containsInAnyOrder(Arrays.asList(infoWithSlotId(slotId1), infoWithSlotId(slotId2))));
+
+		// move slot2 to PENDING
+		tracker.notifyAllocationStart(slotId2, jobId);
+
+		tracker.notifySlotStatus(Arrays.asList(
+			new SlotStatus(slotId1, ResourceProfile.ANY, jobId, new AllocationID()),
+			new SlotStatus(slotId2, ResourceProfile.ANY, null, new AllocationID()),
+			new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID())));
+
+		// slot1 should now be allocated; slot2 should continue to be in a pending state; slot3 should be freed
+		assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId3)));
+
+		// if slot2 is not in a pending state, this will fail with an exception
+		tracker.notifyAllocationComplete(slotId2, jobId);
+	}
+
+	/**
+	 * Tests all state transitions that could (or should not) occur due to a slot status update. This test only checks
+	 * the target state and job ID for state transitions, because the slot ID is not interesting and the slot state
+	 * is not *actually* being updated. We assume the reconciler locks in a set of transitions given a source and target
+	 * state, without worrying about the correctness of intermediate steps (because it shouldn't; and it would be a bit
+	 * annoying to setup).
+	 */
+	@Test
+	public void testSlotStatusReconciliation() {
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
+
+		DefaultSlotTracker.SlotStatusStateReconciler reconciler = new DefaultSlotTracker.SlotStatusStateReconciler(
+			slot -> stateTransitions.add(new SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.FREE, slot.getJobId())),
+			(slot, jobID) -> stateTransitions.add(new SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.PENDING, jobID)),
+			(slot, jobID) -> stateTransitions.add(new SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.ALLOCATED, jobID)));
+
+		{// free slot
+			DeclarativeTaskManagerSlot slot = new DeclarativeTaskManagerSlot(new SlotID(ResourceID.generate(), 0), ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION);
+
+			reconciler.executeStateTransition(slot, null);
+			assertThat(stateTransitions, empty());
+
+			reconciler.executeStateTransition(slot, jobId1);
+			assertThat(stateTransitions.remove(), is(transitionWithTargetStateForJob(SlotState.PENDING, jobId1)));
+			assertThat(stateTransitions.remove(), is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+		}

Review comment:
       I think I would split these blocks into separate test methods.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Queue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultSlotTracker}.
+ */
+public class DefaultSlotTrackerTest extends TestLogger {
+
+	private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = new TaskExecutorConnection(
+		ResourceID.generate(),
+		new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+
+	private static final JobID jobId = new JobID();
+
+	@Test
+	public void testInitialBehavior() {

Review comment:
       maybe spell out what the initial behaviour is. For someone not having written this code, this might not be obvious.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org