You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/06 11:07:12 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #14560: [FLINK-20837] Refactor dynamic SlotID

xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r552509039



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -463,7 +473,7 @@ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
         TaskSlot<T> taskSlot = taskSlots.get(index);
         if (taskSlot != null) {
             return taskSlot.isAllocated(jobId, allocationId);
-        } else if (index < 0) {
+        } else if (index >= numberSlots) {

Review comment:
       If we also insert dynamic slot to the `taskSlot`, we won't need this `else-if` branch anymore.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -95,6 +96,9 @@
     /** The table state. */
     private volatile State state;
 
+    /** Current index for dynamic slot, should always not less than numberSlots */
+    private AtomicInteger dynamicSlotIndex;

Review comment:
       I think `TaskSlotTableImpl` is not designed to be thread-safe, and should always be accessed from the rpc main thread. So we should not need `AtomicInteger` here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -321,6 +325,12 @@ public boolean allocateSlot(
             return false;
         }
 
+        // The negative index indicate that the SlotManger allocate a dynamic slot, we transfer the
+        // index to an increasing number not less than the numberSlots.
+        if (index < 0) {
+            index = nextDynamicSlotIndex();
+        }

Review comment:
       It's quite implicit that the method argument is overwritten in the middle of the method body.
   
   I would suggest the following to convert `index` into a `effectiveIndex` at the beginning of this method. (Or maybe rename the argument to `requestedIndex` and convert it to `index`). Then use the effective index for the rest of the method.
   
   That also means all the `index < 0` checks should be replaced with `index >= numberSlots`. Maybe introduce a util method `isDynamicIndex`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,6 +288,11 @@ public boolean allocateSlot(
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
+            if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+                // If the slot is a dynamic slot with expected jobId and allocationId, it should be
+                // treated as duplicate allocate request.
+                return true;
+            }

Review comment:
       These boolean expressions in the `if` and `return` statements have become quite hard to understand.
   Maybe we can wrap them into separate methods with meaningful names.
   Something like:
   ```
   if (isAllocationIdExist()) {
     return isDuplicateSlot();
   } else if (isSlotIndexTaken()) {
     return false;
   }
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,6 +288,11 @@ public boolean allocateSlot(
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
+            if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+                // If the slot is a dynamic slot with expected jobId and allocationId, it should be
+                // treated as duplicate allocate request.
+                return true;
+            }

Review comment:
       I think this is a reported issue, FLINK-15660.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -329,7 +339,7 @@ public boolean allocateSlot(
                         jobId,
                         allocationId,
                         memoryVerificationExecutor);
-        if (index >= 0) {
+        if (index < numberSlots) {

Review comment:
       Now since the dynamic slots also have unique indexes, we can also insert them into `taskSlots`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org