You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "reswqa (via GitHub)" <gi...@apache.org> on 2023/04/21 18:00:02 UTC

[GitHub] [flink] reswqa commented on a diff in pull request #22424: [FLINK-31842][runtime] Calculate the utilization of the task executor only when it is using

reswqa commented on code in PR #22424:
URL: https://github.com/apache/flink/pull/22424#discussion_r1174029242


##########
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java:
##########
@@ -134,39 +139,57 @@ private Matcher<SlotSelectionStrategy.SlotInfoAndResources> withSlotInfo(SlotInf
                         resourceProfile, Collections.singletonList(nonLocalTm));
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
 
-        Assert.assertTrue(match.isPresent());
-        final SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality = match.get();
-        assertThat(candidates, hasItem(withSlotInfo(slotInfoAndLocality.getSlotInfo())));
-        assertThat(slotInfoAndLocality, hasLocality(Locality.NON_LOCAL));
+        assertThat(match)
+                .hasValueSatisfying(
+                        slotInfoAndLocality -> {
+                            assertThat(slotInfoAndLocality.getLocality())
+                                    .isEqualTo(Locality.NON_LOCAL);
+                            assertThat(candidates)
+                                    .anySatisfy(
+                                            slotInfoAndResources ->
+                                                    assertThat(slotInfoAndResources.getSlotInfo())
+                                                            .isEqualTo(
+                                                                    slotInfoAndLocality
+                                                                            .getSlotInfo()));
+                        });
     }
 
     @Test
-    public void matchPreferredLocation() {
+    void matchPreferredLocation() {
 
         SlotProfile slotProfile =
                 SlotProfileTestingUtils.preferredLocality(
                         biggerResourceProfile, Collections.singletonList(tml2));
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
 
-        Assert.assertEquals(slotInfo2, match.get().getSlotInfo());
+        assertThat(match)
+                .hasValueSatisfying(
+                        slotInfoAndLocality ->
+                                assertThat(slotInfoAndLocality.getSlotInfo()).isEqualTo(slotInfo2));

Review Comment:
   The same logic appears in these two test class many times, I'd prefer extracts a specific method to handle this assertion.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java:
##########
@@ -18,26 +18,37 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.function.Function;
+
 /**
  * Container for {@link SlotInfo} and the task executors utilization (freeSlots /
  * totalOfferedSlots).
  */
 public final class SlotInfoWithUtilization implements SlotInfo {
     private final SlotInfo slotInfoDelegate;
-    private final double taskExecutorUtilization;
+    private final Function<ResourceID, Double> taskExecutorUtilizationLookup;
 
-    private SlotInfoWithUtilization(SlotInfo slotInfo, double taskExecutorUtilization) {
+    private SlotInfoWithUtilization(
+            SlotInfo slotInfo, Function<ResourceID, Double> taskExecutorUtilizationLookup) {
         this.slotInfoDelegate = slotInfo;
-        this.taskExecutorUtilization = taskExecutorUtilization;
+        this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup;
     }
 
+    @VisibleForTesting
     double getTaskExecutorUtilization() {
-        return taskExecutorUtilization;
+        return taskExecutorUtilizationLookup.apply(
+                slotInfoDelegate.getTaskManagerLocation().getResourceID());
+    }

Review Comment:
   It seems that this method has still not been removed. Have you just forgotten it or do you have other concerns 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java:
##########
@@ -63,67 +54,81 @@ public void testPhysicalSlotResourceProfileRespected() {
                         Collections.emptySet());
 
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
-        Assert.assertTrue(
-                match.get()
-                        .getSlotInfo()
-                        .getResourceProfile()
-                        .isMatching(slotProfile.getPhysicalSlotResourceProfile()));
+        assertThat(match)
+                .hasValueSatisfying(
+                        slotInfoAndLocality ->
+                                assertThat(
+                                                slotInfoAndLocality
+                                                        .getSlotInfo()
+                                                        .getResourceProfile()
+                                                        .isMatching(
+                                                                slotProfile
+                                                                        .getPhysicalSlotResourceProfile()))
+                                        .isTrue());
 
         ResourceProfile evenBiggerResourceProfile =
                 ResourceProfile.fromResources(
                         biggerResourceProfile.getCpuCores().getValue().doubleValue() + 1.0,
                         resourceProfile.getTaskHeapMemory().getMebiBytes());
-        slotProfile =
+        final SlotProfile slotProfileNotMatching =
                 SlotProfile.priorAllocation(
                         resourceProfile,
                         evenBiggerResourceProfile,
                         Collections.emptyList(),
                         Collections.emptyList(),
                         Collections.emptySet());
 
-        match = runMatching(slotProfile);
-        Assert.assertFalse(match.isPresent());
+        match = runMatching(slotProfileNotMatching);
+        assertThat(match).isNotPresent();
     }
 
     @Test
-    public void matchNoRequirements() {
+    void matchNoRequirements() {
 
         SlotProfile slotProfile = SlotProfileTestingUtils.noRequirements();
         Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);
 
-        Assert.assertTrue(match.isPresent());
-        final SlotSelectionStrategy.SlotInfoAndLocality slotInfoAndLocality = match.get();
-        assertThat(candidates, hasItem(withSlotInfo(slotInfoAndLocality.getSlotInfo())));
-        assertThat(slotInfoAndLocality, hasLocality(Locality.UNCONSTRAINED));
+        assertThat(match)
+                .hasValueSatisfying(
+                        slotInfoAndLocality -> {
+                            assertThat(slotInfoAndLocality.getLocality())
+                                    .isEqualTo(Locality.UNCONSTRAINED);
+                            assertThat(candidates)
+                                    .anySatisfy(
+                                            slotInfoAndResources ->
+                                                    assertThat(slotInfoAndResources.getSlotInfo())
+                                                            .isEqualTo(
+                                                                    slotInfoAndLocality
+                                                                            .getSlotInfo()));
+                        });

Review Comment:
   I'd prefer extract this logic to a specific method, this will also be helpful for adding new test case in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org