You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/07 03:10:36 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #14561: [FLINK-20836] Register TaskManager with total and default slot resource profile in SlotManager

xintongsong commented on a change in pull request #14561:
URL: https://github.com/apache/flink/pull/14561#discussion_r553079447



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -275,7 +275,10 @@ public void processResourceRequirements(ResourceRequirements resourceRequirement
      */
     @Override
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {

Review comment:
       JavaDoc should be updated accordingly.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -459,7 +459,10 @@ public boolean unregisterSlotRequest(AllocationID allocationId) {
      */
     @Override
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {

Review comment:
       JavaDoc should be updated accordingly.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -464,7 +464,11 @@ private void stopResourceManagerServices() throws Exception {
                 taskExecutors.get(taskManagerResourceId);
 
         if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
-            if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
+            if (slotManager.registerTaskManager(
+                    workerTypeWorkerRegistration,
+                    slotReport,
+                    workerTypeWorkerRegistration.getTotalResourceProfile(),
+                    workerTypeWorkerRegistration.getDefaultSlotResourceProfile())) {

Review comment:
       It's a bit weird that we have to pass in `workerTypeWorkerRegistration. getTotalResourceProfile()` and `workerTypeWorkerRegistration. getDefaultSlotResourceProfile()` when we have already passed in `workerTypeWorkerRegistration`.
   
   Despite the names, I think the boundary between `TaskExecutorConnection` and `WorkerRegistration` is that, the former contains information needed in `SlotManager` while the latter contains additional information needed in `ResourceManager`. (The name `TaskExecutorConnection` is probably because previously we need nothing more than the IDs and the RPC gateway in `SlotManager`.)
   
   Since the total and default slot resource profiles are only used in `SlotManager`, we probably should move them into `TaskExecutorConnection`. We may also rename the two classes as follows to explicitly suggest their scope of usage.
   * `WorkerRegistration` -> `ResourceManagerWorkerRegistration`
   * `TaskExecutorConnection` -> `SlotManagerWorkerRegistration`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org