You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/06 14:18:26 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13544: [FLINK-19309][coordination] Add TaskExecutorManager

tillrohrmann commented on a change in pull request #13544:
URL: https://github.com/apache/flink/pull/13544#discussion_r500299558



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * SlotManager component for all task executor related things.
+ *
+ * <p>Dev note: This component only exists to keep the code out of the slot manager.
+ * It covers many aspects that aren't really the responsibility of the slot manager, and should be refactored to live
+ * outside the slot manager and split into multiple parts.
+ */
+class TaskExecutorManager implements AutoCloseable {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorManager.class);
+
+	private final ResourceProfile defaultSlotResourceProfile;
+
+	/** The default resource spec of workers to request. */
+	private final WorkerResourceSpec defaultWorkerResourceSpec;
+
+	private final int numSlotsPerWorker;
+
+	/** Defines the max limitation of the total number of slots. */
+	private final int maxSlotNum;
+
+	/** Release task executor only when each produced result partition is either consumed or failed. */
+	private final boolean waitResultConsumedBeforeRelease;
+
+	/** Defines the number of redundant taskmanagers. */
+	private final int redundantTaskManagerNum;
+
+	/** Timeout after which an unused TaskManager is released. */
+	private final Time taskManagerTimeout;
+
+	/** Callbacks for resource (de-)allocations. */
+	private final ResourceActions resourceActions;
+
+	/** All currently registered task managers. */
+	private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations = new HashMap<>();
+
+	private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots = new HashMap<>();

Review comment:
       I don't fully understand why the TEM is responsible for the pending slots. This looks more like the responsibility of the `SlotManager` to me.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * SlotManager component for all task executor related things.
+ *
+ * <p>Dev note: This component only exists to keep the code out of the slot manager.
+ * It covers many aspects that aren't really the responsibility of the slot manager, and should be refactored to live
+ * outside the slot manager and split into multiple parts.
+ */
+class TaskExecutorManager implements AutoCloseable {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorManager.class);
+
+	private final ResourceProfile defaultSlotResourceProfile;
+
+	/** The default resource spec of workers to request. */
+	private final WorkerResourceSpec defaultWorkerResourceSpec;

Review comment:
       Why does the `TaskExecutorManager` needs to know these values?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link TaskExecutorManager}.
+ */
+public class TaskExecutorManagerTest extends TestLogger {
+
+	/**
+	 * Tests that a pending slot is only fulfilled by an exactly matching received slot.
+	 */
+	@Test
+	public void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() {
+		final int numWorkerCpuCores = 3;
+		final WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build();
+		final ResourceProfile requestedSlotProfile = ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores).build();
+		final ResourceProfile offeredSlotProfile = ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores - 1).build();
+
+		try (final TaskExecutorManager taskExecutorManager = createTaskExecutorManagerBuilder()
+			.setDefaultWorkerResourceSpec(workerResourceSpec)
+			.setNumSlotsPerWorker(1) // set to one so that the slot profiles directly correspond to the worker spec
+			.setMaxNumSlots(2)
+			.createTaskExecutorManager()) {
+
+			// create pending slot
+			taskExecutorManager.allocateWorker(requestedSlotProfile);
+			assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1));
+
+			createAndRegisterTaskExecutor(taskExecutorManager, 1, offeredSlotProfile);
+
+			// the slot from the task executor should be accepted, but we should still be waiting for the originally requested slot
+			assertThat(taskExecutorManager.getNumberRegisteredSlots(), is(1));
+			assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1));
+		}
+	}
+
+	/**
+	 * Tests that a pending slot is not fulfilled by an already allocated slot.
+	 */
+	@Test
+	public void testPendingSlotNotFulfilledByAllocatedSlot() {
+		final int numWorkerCpuCores = 3;
+		final WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(numWorkerCpuCores).build();
+		final ResourceProfile requestedSlotProfile = ResourceProfile.newBuilder().setCpuCores(numWorkerCpuCores).build();
+
+		try (final TaskExecutorManager taskExecutorManager = createTaskExecutorManagerBuilder()
+			.setDefaultWorkerResourceSpec(workerResourceSpec)
+			.setNumSlotsPerWorker(1) // set to one so that the slot profiles directly correspond to the worker spec
+			.setMaxNumSlots(2)
+			.createTaskExecutorManager()) {
+
+			// create pending slot
+			taskExecutorManager.allocateWorker(requestedSlotProfile);
+			assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1));
+
+			final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
+			final SlotReport slotReport = new SlotReport(
+				new SlotStatus(
+					new SlotID(taskExecutorConnection.getResourceID(), 0),
+					requestedSlotProfile,
+					JobID.generate(),
+					new AllocationID()));
+			taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+			// the slot from the task executor should be accepted, but we should still be waiting for the originally requested slot
+			assertThat(taskExecutorManager.getNumberRegisteredSlots(), is(1));
+			assertThat(taskExecutorManager.getNumberPendingTaskManagerSlots(), is(1));
+		}
+	}
+
+	/**
+	 * Tests that a task manager timeout does not remove the slots from the SlotManager.
+	 * A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
+	 * callback. The receiver of the callback can then decide what to do with the TaskManager.
+	 *
+	 * <p>See FLINK-7793
+	 */
+	@Test
+	public void testTaskManagerTimeoutDoesNotRemoveSlots1() throws Exception {

Review comment:
       ```suggestion
   	public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * SlotManager component for all task executor related things.

Review comment:
       For the understanding of this component it could be helpful to describe what this component is responsible for. I don't fully understand why this component is responsible for managing the pending slots and also decides whether we have reached the maximum number of slots or not. Judging from its name, I would expect this component to be responsible for managing the TE connections and do the timeouts of idle TEs.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * SlotManager component for all task executor related things.
+ *
+ * <p>Dev note: This component only exists to keep the code out of the slot manager.
+ * It covers many aspects that aren't really the responsibility of the slot manager, and should be refactored to live
+ * outside the slot manager and split into multiple parts.
+ */
+class TaskExecutorManager implements AutoCloseable {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorManager.class);
+
+	private final ResourceProfile defaultSlotResourceProfile;
+
+	/** The default resource spec of workers to request. */
+	private final WorkerResourceSpec defaultWorkerResourceSpec;
+
+	private final int numSlotsPerWorker;
+
+	/** Defines the max limitation of the total number of slots. */
+	private final int maxSlotNum;
+
+	/** Release task executor only when each produced result partition is either consumed or failed. */
+	private final boolean waitResultConsumedBeforeRelease;
+
+	/** Defines the number of redundant taskmanagers. */
+	private final int redundantTaskManagerNum;
+
+	/** Timeout after which an unused TaskManager is released. */
+	private final Time taskManagerTimeout;
+
+	/** Callbacks for resource (de-)allocations. */
+	private final ResourceActions resourceActions;
+
+	/** All currently registered task managers. */
+	private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations = new HashMap<>();
+
+	private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots = new HashMap<>();
+
+	private final Executor mainThreadExecutor;
+
+	private final ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
+
+	TaskExecutorManager(
+		WorkerResourceSpec defaultWorkerResourceSpec,
+		int numSlotsPerWorker,
+		int maxNumSlots,
+		boolean waitResultConsumedBeforeRelease,
+		int redundantTaskManagerNum,
+		Time taskManagerTimeout,
+		ScheduledExecutor scheduledExecutor,
+		Executor mainThreadExecutor,
+		ResourceActions resourceActions) {
+
+		this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
+		this.numSlotsPerWorker = numSlotsPerWorker;
+		this.maxSlotNum = maxNumSlots;
+		this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
+		this.redundantTaskManagerNum = redundantTaskManagerNum;
+		this.taskManagerTimeout = taskManagerTimeout;
+		this.defaultSlotResourceProfile = generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
+
+		this.resourceActions = Preconditions.checkNotNull(resourceActions);
+		this.mainThreadExecutor = mainThreadExecutor;
+		taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(
+			() -> mainThreadExecutor.execute(
+				this::checkTaskManagerTimeoutsAndRedundancy),
+			0L,
+			taskManagerTimeout.toMilliseconds(),
+			TimeUnit.MILLISECONDS);
+	}
+
+	@VisibleForTesting
+	static ResourceProfile generateDefaultSlotResourceProfile(WorkerResourceSpec workerResourceSpec, int numSlotsPerWorker) {
+		return ResourceProfile.newBuilder()
+			.setCpuCores(workerResourceSpec.getCpuCores().divide(numSlotsPerWorker))
+			.setTaskHeapMemory(workerResourceSpec.getTaskHeapSize().divide(numSlotsPerWorker))
+			.setTaskOffHeapMemory(workerResourceSpec.getTaskOffHeapSize().divide(numSlotsPerWorker))
+			.setManagedMemory(workerResourceSpec.getManagedMemSize().divide(numSlotsPerWorker))
+			.setNetworkMemory(workerResourceSpec.getNetworkMemSize().divide(numSlotsPerWorker))
+			.build();
+	}
+
+	@Override
+	public void close() {
+		taskManagerTimeoutsAndRedundancyCheck.cancel(false);
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// TaskExecutor (un)registration
+	// ---------------------------------------------------------------------------------------------
+
+	public boolean isTaskManagerRegistered(InstanceID instanceId) {
+		return taskManagerRegistrations.containsKey(instanceId);
+	}
+
+	public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+		if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
+			LOG.info("The total number of slots exceeds the max limitation {}, releasing the excess task executor.", maxSlotNum);
+			resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
+			return false;
+		}
+
+		TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
+			taskExecutorConnection,
+			StreamSupport.stream(initialSlotReport.spliterator(), false).map(SlotStatus::getSlotID).collect(Collectors.toList()));
+
+		taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
+
+		// next register the new slots
+		for (SlotStatus slotStatus : initialSlotReport) {
+			if (slotStatus.getJobID() == null) {
+				findAndRemoveExactlyMatchingPendingTaskManagerSlot(slotStatus.getResourceProfile());
+			}
+		}
+
+		return true;
+	}
+
+	private boolean isMaxSlotNumExceededAfterRegistration(SlotReport initialSlotReport) {
+		// check if the total number exceed before matching pending slot.
+		if (!isMaxSlotNumExceededAfterAdding(initialSlotReport.getNumSlotStatus())) {
+			return false;
+		}
+
+		// check if the total number exceed slots after consuming pending slot.
+		return isMaxSlotNumExceededAfterAdding(getNumNonPendingReportedNewSlots(initialSlotReport));
+	}

Review comment:
       Shouldn't this the concern of the `SlotManager` intead of the `TEM`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org