You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/20 14:51:13 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #14647: [FLINK-20835] Implement FineGrainedSlotManager

xintongsong commented on a change in pull request #14647:
URL: https://github.com/apache/flink/pull/14647#discussion_r560763453



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorInfo.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Basic resource information of a TaskExecutor. */
+public interface TaskExecutorInfo {

Review comment:
       I think the class name is a bit too general.
   Maybe `TaskManagerResourceView` or `TaskManagerResourceStatus` to indicate 1) it's about resource and 2) it provides a view of the a changeable status.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -111,6 +111,19 @@
                     .withDescription(
                             "Defines whether the cluster uses declarative resource management.");
 
+    @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT =
+            ConfigOptions.key("cluster.fine-grained-resource-management.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Defines whether the cluster uses fine-grained resource management.");
+
+    public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {

Review comment:
       nit: call me captious, but maybe keep the same order between fine-grained and declarative for the config options and methods. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+

Review comment:
       Should there be a method for removing a pending task executor? Say the worker is not registered and is no longer needed.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Utility method
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultResourceProfile);
+
+    // ---------------------------------------------------------------------------------------------
+    // TaskExecutor idleness / redundancy
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get all task executors which idle exceed the given timeout period
+     *
+     * @param taskManagerTimeout timeout period
+     * @return a map of timeout task executors' connection index by the instance id
+     */
+    Map<InstanceID, TaskExecutorConnection> getTimeOutTaskExecutors(Time taskManagerTimeout);

Review comment:
       This method should not belong to this interface.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorAllocationStrategy.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.slots.ResourceCounter;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Strategy how to allocate new task executors to fulfill the unfulfilled requirements. */
+public interface TaskExecutorAllocationStrategy {
+    TaskExecutorAllocationStrategy NO_OP_STRATEGY =
+            (requirements, existingPendingResources) -> Collections.emptyMap();
+
+    /**
+     * Calculate {@link PendingTaskManager}s needed to fulfill the given requirements.
+     *
+     * @param requirements requirements indexed by jobId
+     * @param existingPendingResources existing pending resources can be used to fulfill requirement
+     * @return {@link PendingTaskManager}s needed and whether all the requirements can be fulfilled
+     *     after allocation of pending task executors, indexed by jobId
+     */
+    Map<JobID, Tuple2<Map<PendingTaskManager, Integer>, Boolean>> getTaskExecutorsToFulfill(

Review comment:
       How do we guarantee `TaskExecutorMatchingStrategy` matches slot to pending TMs in the same way `TaskExecutorAllocationStrategy` calculates needed TMs?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtils.java
##########
@@ -0,0 +1,36 @@
+/*

Review comment:
       The commit message is a bit misleading.
   It sounds like a new util class is implemented, which should not be a hotfix, while actually the commit only moves some existing codes for deduplication.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link TaskExecutorMatchingStrategy} which picks the first matching task executor. */
+public enum AnyMatchingTaskExecutorMatchingStrategy implements TaskExecutorMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Optional<InstanceID> findMatchingTaskExecutor(
+            ResourceProfile requirement,
+            Map<InstanceID, ? extends TaskExecutorInfo> taskExecutors) {
+        return taskExecutors.entrySet().stream()
+                .filter(taskExecutor -> canFulfillRequirement(requirement, taskExecutor.getValue()))
+                .findFirst()
+                .map(Map.Entry::getKey);
+    }
+
+    private boolean canFulfillRequirement(

Review comment:
       can be static

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {

Review comment:
       My gut feeling is that, this interface contains too many methods, and not all of them are closely related. They could be further grouped and structured.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
##########
@@ -28,6 +30,10 @@
 
     SlotID getSlotId();
 
+    AllocationID getAllocationId();
+
+    JobID getJobId();

Review comment:
       These two should be marked `@Nullable`

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategyTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link AnyMatchingTaskExecutorMatchingStrategy}. */
+public class AnyMatchingTaskExecutorMatchingStrategyTest extends TestLogger {
+    private final InstanceID instanceIdOfLargeTaskExecutor = new InstanceID();
+    private final InstanceID instanceIdOfSmallTaskExecutor = new InstanceID();
+    private final ResourceProfile largeResourceProfile = ResourceProfile.fromResources(10.2, 42);
+    private final ResourceProfile smallResourceProfile = ResourceProfile.fromResources(1, 1);

Review comment:
       static

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(

Review comment:
       This method should not belong to this interface.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -134,11 +134,13 @@ public static TaskExecutorResourceSpec resourceSpecFromConfigForLocalExecution(
     public static Configuration adjustForLocalExecution(Configuration config) {
         UNUSED_CONFIG_OPTIONS.forEach(option -> warnOptionHasNoEffectIfSet(config, option));
 
-        setConfigOptionToPassedMaxIfNotSet(config, TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);
+        setConfigOptionToPassedMaxIfNotSet(config, TaskManagerOptions.CPU_CORES, 1000000.0);
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.MAX_VALUE);
+                config, TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024 * 1024));
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.MAX_VALUE);
+                config,
+                TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+                MemorySize.ofMebiBytes(1024 * 1024));

Review comment:
       Let's replace the magic numbers with static constants.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -111,6 +111,19 @@
                     .withDescription(
                             "Defines whether the cluster uses declarative resource management.");
 
+    @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT =
+            ConfigOptions.key("cluster.fine-grained-resource-management.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Defines whether the cluster uses fine-grained resource management.");
+
+    public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {
+        return isDeclarativeResourceManagementEnabled(configuration)

Review comment:
       IIUC, we need to bind fine-grained with declarative because in the first step we implement the feature base on the declarative protocol, which also requires declarative slot pool being used. And once FLINK-20838 is finished, we would be able to support both protocols and no longer need this binding.
   
   If this is the case, I would suggest to add a `TODO` here explaining why this is temporary needed and when should be removed.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Utility method
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultResourceProfile);
+
+    // ---------------------------------------------------------------------------------------------
+    // TaskExecutor idleness / redundancy
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get all task executors which idle exceed the given timeout period
+     *
+     * @param taskManagerTimeout timeout period
+     * @return a map of timeout task executors' connection index by the instance id
+     */
+    Map<InstanceID, TaskExecutorConnection> getTimeOutTaskExecutors(Time taskManagerTimeout);
+
+    /**
+     * Get the start time of idleness of the task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return the start time of idleness
+     */
+    long getTaskExecutorIdleSince(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // slot / resource counts
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the total registered resources.
+     *
+     * @return the total registered resources
+     */
+    ResourceProfile getTotalRegisteredResources();
+
+    /**
+     * Get the total registered resources of the given task executor
+     *
+     * @param instanceId of the task executor
+     * @return the total registered resources of the given task executor
+     */
+    ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceId);

Review comment:
       The following methods can be aggregated to something like `TMStatus getTMStatus(InstanceID instanceId)`, where `TMStatus` (maybe another name) is a data structure containing all information needed.
   - ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceId)
   - ResourceProfile getTotalFreeResourcesOf(InstanceID instanceId)
   - int getNumberRegisteredSlotsOf(InstanceID instanceId)
   - int getNumberFreeSlotsOf(InstanceID instanceId)
   - long getTaskExecutorIdleSince(InstanceID instanceId)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskExecutorInfo.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+public class TestingTaskExecutorInfo implements TaskExecutorInfo {
+    private final ResourceProfile totalResource;
+    private final ResourceProfile availableResource;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int numberAllocatedSlots;
+
+    private TestingTaskExecutorInfo(
+            ResourceProfile totalResource,
+            ResourceProfile availableResource,
+            ResourceProfile defaultSlotResourceProfile,
+            int numberAllocatedSlots) {
+        this.totalResource = totalResource;
+        this.availableResource = availableResource;
+        this.defaultSlotResourceProfile = defaultSlotResourceProfile;
+        this.numberAllocatedSlots = numberAllocatedSlots;
+    }
+
+    @Override
+    public ResourceProfile getTotalResource() {
+        return totalResource;
+    }
+
+    @Override
+    public ResourceProfile getDefaultSlotResourceProfile() {
+        return defaultSlotResourceProfile;
+    }
+
+    @Override
+    public ResourceProfile getAvailableResource() {
+        return availableResource;
+    }
+
+    @Override
+    public int getNumberAllocatedSlots() {
+        return numberAllocatedSlots;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ResourceProfile totalResource = ResourceProfile.ANY;
+        private ResourceProfile availableResource = ResourceProfile.ANY;
+        private ResourceProfile defaultSlotResourceProfile = ResourceProfile.ANY;
+        private int numberAllocatedSlots = 0;
+
+        private Builder() {};
+
+        public Builder withTotalResource(ResourceProfile totalResource) {
+            this.totalResource = totalResource;
+            return this;
+        }
+
+        public Builder withAvailableResource(ResourceProfile availableResource) {
+            this.availableResource = availableResource;
+            return this;
+        }
+
+        public Builder withDefaultSlotResourceProfile(ResourceProfile defaultSlotResourceProfile) {
+            this.defaultSlotResourceProfile = defaultSlotResourceProfile;
+            return this;
+        }
+
+        public Builder withNumberAllocatedSlots(int numberAllocatedSlots) {
+            this.numberAllocatedSlots = numberAllocatedSlots;
+            return this;
+        }

Review comment:
       Better to add sanity checks for the setters:
   - non-null
   - available <= total
   - default <= total

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
##########
@@ -42,4 +45,24 @@
      */
     void notifySlotStatusChange(
             TaskManagerSlotInformation slot, SlotState previous, SlotState current, JobID jobId);
+
+    class MultiSlotStatusUpdateListener implements SlotStatusUpdateListener {

Review comment:
       IIUC, this change is to allow `MultiSlotStatusUpdateListener` to be reused by other classes than `DefaultSlotTracker`.
   Then I would suggest to make `MultiSlotStatusUpdateListener` a separate file. An interface does not normally have an inner class that implements itself. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Strategy how to find a matching task executor. */
+public interface TaskExecutorMatchingStrategy {
+
+    /**
+     * Finds a matching task executor for the requested {@link ResourceProfile} given the collection
+     * of task executors.
+     *
+     * @param requirement to find a matching task executor for
+     * @param taskExecutors candidates
+     * @return Returns the instance id of matching task executor or {@link Optional#empty()} if
+     *     there is none
+     */
+    Optional<InstanceID> findMatchingTaskExecutor(
+            ResourceProfile requirement, Map<InstanceID, ? extends TaskExecutorInfo> taskExecutors);

Review comment:
       Why do we need a map for this interface? We should not need to lookup a `TaskExecutorInfo` by its `InstanceID` in the strategy.
   It might be better to include `InstanceID` in `TaskExecutorInfo` and pass in a list or collection.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategyTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link AnyMatchingTaskExecutorMatchingStrategy}. */
+public class AnyMatchingTaskExecutorMatchingStrategyTest extends TestLogger {
+    private final InstanceID instanceIdOfLargeTaskExecutor = new InstanceID();
+    private final InstanceID instanceIdOfSmallTaskExecutor = new InstanceID();
+    private final ResourceProfile largeResourceProfile = ResourceProfile.fromResources(10.2, 42);
+    private final ResourceProfile smallResourceProfile = ResourceProfile.fromResources(1, 1);
+    private Map<InstanceID, TaskExecutorInfo> taskExecutors = null;
+    private Set<InstanceID> candidates = null;
+
+    @Before
+    public void setup() {
+        taskExecutors = new HashMap<>();
+        candidates = new HashSet<>();
+        taskExecutors.put(
+                instanceIdOfSmallTaskExecutor,
+                TestingTaskExecutorInfo.newBuilder()
+                        .withAvailableResource(smallResourceProfile)
+                        .build());
+        taskExecutors.put(
+                instanceIdOfLargeTaskExecutor,
+                TestingTaskExecutorInfo.newBuilder()
+                        .withAvailableResource(largeResourceProfile)
+                        .build());

Review comment:
       Why do we need this `setup()`? We can also make `taskExecutors` a constant.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -278,30 +277,4 @@ public void executeStateTransition(DeclarativeTaskManagerSlot slot, JobID jobId)
             }
         }
     }
-
-    private static class MultiSlotStatusUpdateListener implements SlotStatusUpdateListener {

Review comment:
       This should also be a hotfix commit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link TaskExecutorMatchingStrategy} which picks the first matching task executor. */
+public enum AnyMatchingTaskExecutorMatchingStrategy implements TaskExecutorMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Optional<InstanceID> findMatchingTaskExecutor(
+            ResourceProfile requirement,
+            Map<InstanceID, ? extends TaskExecutorInfo> taskExecutors) {
+        return taskExecutors.entrySet().stream()
+                .filter(taskExecutor -> canFulfillRequirement(requirement, taskExecutor.getValue()))
+                .findFirst()

Review comment:
       `findFirst` -> `findAny`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastAllocateSlotsTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * {@link TaskExecutorMatchingStrategy} which picks a matching TaskExecutor with the least number of
+ * allocated slots.
+ */
+public enum LeastAllocateSlotsTaskExecutorMatchingStrategy implements TaskExecutorMatchingStrategy {

Review comment:
       I believe this is for the configuration option `cluster.evenly-spread-out-slots`.
   
   However, I'm not sure how we want this feature for the fine-grained resource management, where amount of resource plays a more important role than number of slots.
   
   This may need to revisit how this feature should be exposed, to be compatible for both coarse/fine-grained resource management. Given that this PR is already quite big, I would suggest to not introduce this strategy in this PR, but create a follow-up issue instead.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategyTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link AnyMatchingTaskExecutorMatchingStrategy}. */
+public class AnyMatchingTaskExecutorMatchingStrategyTest extends TestLogger {
+    private final InstanceID instanceIdOfLargeTaskExecutor = new InstanceID();
+    private final InstanceID instanceIdOfSmallTaskExecutor = new InstanceID();
+    private final ResourceProfile largeResourceProfile = ResourceProfile.fromResources(10.2, 42);
+    private final ResourceProfile smallResourceProfile = ResourceProfile.fromResources(1, 1);
+    private Map<InstanceID, TaskExecutorInfo> taskExecutors = null;
+    private Set<InstanceID> candidates = null;

Review comment:
       Never used.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -134,11 +134,13 @@ public static TaskExecutorResourceSpec resourceSpecFromConfigForLocalExecution(
     public static Configuration adjustForLocalExecution(Configuration config) {
         UNUSED_CONFIG_OPTIONS.forEach(option -> warnOptionHasNoEffectIfSet(config, option));
 
-        setConfigOptionToPassedMaxIfNotSet(config, TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);
+        setConfigOptionToPassedMaxIfNotSet(config, TaskManagerOptions.CPU_CORES, 1000000.0);
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.MAX_VALUE);
+                config, TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024 * 1024));
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.MAX_VALUE);
+                config,
+                TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+                MemorySize.ofMebiBytes(1024 * 1024));

Review comment:
       And it would be helpful to explain the purpose of this change in the commit message.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Utility method
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(

Review comment:
       This method should not belong to this interface.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -70,6 +71,16 @@ public static WorkerResourceSpec fromTaskExecutorProcessSpec(
                 taskExecutorProcessSpec.getManagedMemorySize());
     }
 
+    public static WorkerResourceSpec fromResourceProfile(final ResourceProfile resourceProfile) {

Review comment:
       Again, should explain the purpose of this change in the commit message?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Strategy how to find a matching task executor. */
+public interface TaskExecutorMatchingStrategy {

Review comment:
       I would suggest `SlotTaskExecutorMatchingStrategy` to suggest that the strategy is for matching between a slot and a TM rather than two TMs.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link TaskExecutorMatchingStrategy} which picks the first matching task executor. */
+public enum AnyMatchingTaskExecutorMatchingStrategy implements TaskExecutorMatchingStrategy {

Review comment:
       JavaDoc: first -> any

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);

Review comment:
       The slot status synchronization logics are a bit too complicate to be included in this interface. Might be better to have a dedicated slot status synchronizer. The synchronizer can take slot manager actions (allocate, free), received slot reports, and the current status as inputs, and notify the `TaskExecutorTracker` and `SlotStatusUpdateListener` as output. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -70,6 +71,16 @@ public static WorkerResourceSpec fromTaskExecutorProcessSpec(
                 taskExecutorProcessSpec.getManagedMemorySize());
     }
 
+    public static WorkerResourceSpec fromResourceProfile(final ResourceProfile resourceProfile) {

Review comment:
       To avoid confusion against slot resource profile.
   ```suggestion
       public static WorkerResourceSpec fromTotalResourceProfile(final ResourceProfile totalResourceProfile) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Utility method
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();

Review comment:
       Maybe return a collection of `TMInfo` rather than only `InstanceID`s.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
##########
@@ -28,6 +30,10 @@
 
     SlotID getSlotId();
 
+    AllocationID getAllocationId();
+
+    JobID getJobId();

Review comment:
       And this looks like a pure minor refactor. Shouldn't it be a hotfix commit?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener slotStatusUpdateListener);
+
+    // ---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // Utility method
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultResourceProfile);
+
+    // ---------------------------------------------------------------------------------------------
+    // TaskExecutor idleness / redundancy
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get all task executors which idle exceed the given timeout period
+     *
+     * @param taskManagerTimeout timeout period
+     * @return a map of timeout task executors' connection index by the instance id
+     */
+    Map<InstanceID, TaskExecutorConnection> getTimeOutTaskExecutors(Time taskManagerTimeout);
+
+    /**
+     * Get the start time of idleness of the task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return the start time of idleness
+     */
+    long getTaskExecutorIdleSince(InstanceID instanceId);
+
+    // ---------------------------------------------------------------------------------------------
+    // slot / resource counts
+    // ---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the total registered resources.
+     *
+     * @return the total registered resources
+     */
+    ResourceProfile getTotalRegisteredResources();

Review comment:
       The following methods can be aggregated to something like `StatusOverview getStatusOverview()`, where `StatusOverview` (maybe another name) is a data structure containing all information needed.
   - ResourceProfile getTotalRegisteredResources()
   - ResourceProfile getTotalFreeResources()
   - int getNumberRegisteredSlots()
   - int getNumberFreeSlots()
   - int getNumberFreeTaskExecutors()
   - int getNumberRegisteredTaskExecutors()
   - int getNumberPendingTaskExecutors();

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Strategy how to find a matching task executor. */
+public interface TaskExecutorMatchingStrategy {

Review comment:
       Moreover, the latest glossary no longer use the term `TaskExecutor`.
   It's not necessary to change the existing codes, but introducing new codes I would suggest to replace `TaskExecutor` with `TaskManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org