You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/01 03:54:01 UTC

[flink] branch master updated (ef07590403a -> eab0a1faf5e)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from ef07590403a [FLINK-28136][runtime] Implement ExecutionTimeBasedSlowTaskDetector
     new c1e39b6aca7 [hotfix][runtime][tests] Migrate TaskManagerLocationTest, TaskExecutorToResourceManagerConnectionTest and TaskManagerRunnerConfigurationTest to JUnit5
     new 51c139b6018 [FLINK-28142][runtime] Enrich TaskManagerLocation with node information
     new eab0a1faf5e [FLINK-28142][runtime] Enrich TaskExecutorRegistration with node information

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../configuration/TaskManagerOptionsInternal.java  |  11 ++
 .../decorators/InitTaskManagerDecorator.java       |  15 ++-
 .../taskmanager/KubernetesTaskExecutorRunner.java  |  29 ++++-
 .../apache/flink/kubernetes/utils/Constants.java   |   4 +
 .../decorators/InitTaskManagerDecoratorTest.java   |  24 +++-
 .../factory/KubernetesTaskManagerFactoryTest.java  |   2 +-
 .../runtime/resourcemanager/ResourceManager.java   |   3 +-
 .../resourcemanager/TaskExecutorRegistration.java  |  17 ++-
 .../registration/WorkerRegistration.java           |  10 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   3 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |   2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |   3 +-
 .../TaskManagerServicesConfiguration.java          |  22 +++-
 .../runtime/taskmanager/TaskManagerLocation.java   |  49 ++++++--
 .../taskmanager/UnresolvedTaskManagerLocation.java |  11 +-
 .../ResourceManagerPartitionLifecycleTest.java     |   3 +-
 .../ResourceManagerTaskExecutorTest.java           |   9 +-
 .../resourcemanager/ResourceManagerTest.java       |   3 +-
 .../active/ActiveResourceManagerTest.java          |   3 +-
 ...askExecutorToResourceManagerConnectionTest.java |  46 +++----
 .../TaskManagerRunnerConfigurationTest.java        | 138 ++++++++++++---------
 .../LocalUnresolvedTaskManagerLocation.java        |   2 +-
 .../taskmanager/TaskManagerLocationTest.java       | 114 ++++++++++-------
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |   3 +
 .../flink/yarn/YarnTaskExecutorRunnerTest.java     |  15 +++
 25 files changed, 385 insertions(+), 156 deletions(-)


[flink] 03/03: [FLINK-28142][runtime] Enrich TaskExecutorRegistration with node information

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eab0a1faf5e7ecf8da641880b8913d49ac19da2b
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Tue Jun 28 22:55:43 2022 +0800

    [FLINK-28142][runtime] Enrich TaskExecutorRegistration with node information
    
    This closes #20056.
---
 .../flink/runtime/resourcemanager/ResourceManager.java  |  3 ++-
 .../resourcemanager/TaskExecutorRegistration.java       | 17 ++++++++++++++++-
 .../registration/WorkerRegistration.java                | 10 +++++++++-
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java |  3 ++-
 .../ResourceManagerPartitionLifecycleTest.java          |  3 ++-
 .../ResourceManagerTaskExecutorTest.java                |  9 ++++++---
 .../runtime/resourcemanager/ResourceManagerTest.java    |  3 ++-
 .../active/ActiveResourceManagerTest.java               |  3 ++-
 .../TaskExecutorToResourceManagerConnectionTest.java    |  7 ++++++-
 9 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c94d6f60a03..c845d997e0d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -942,7 +942,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                             taskExecutorRegistration.getHardwareDescription(),
                             taskExecutorRegistration.getMemoryConfiguration(),
                             taskExecutorRegistration.getTotalResourceProfile(),
-                            taskExecutorRegistration.getDefaultSlotResourceProfile());
+                            taskExecutorRegistration.getDefaultSlotResourceProfile(),
+                            taskExecutorRegistration.getNodeId());
 
             log.info(
                     "Registering TaskManager with ResourceID {} ({}) at ResourceManager",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index 771084f5ee0..b3b9c51a3f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -55,6 +56,14 @@ public class TaskExecutorRegistration implements Serializable {
     /** The task executor total resource profile. */
     private final ResourceProfile totalResourceProfile;
 
+    /**
+     * ID of the node where the TaskManager is located on. In Yarn and Native Kubernetes mode, this
+     * value will be set by resource manager when launch this TaskManager(via the config option
+     * {@link TaskManagerOptionsInternal#TASK_MANAGER_NODE_ID}). In other modes, this value will be
+     * the external address of the TaskManager.
+     */
+    private final String nodeId;
+
     public TaskExecutorRegistration(
             final String taskExecutorAddress,
             final ResourceID resourceId,
@@ -63,7 +72,8 @@ public class TaskExecutorRegistration implements Serializable {
             final HardwareDescription hardwareDescription,
             final TaskExecutorMemoryConfiguration memoryConfiguration,
             final ResourceProfile defaultSlotResourceProfile,
-            final ResourceProfile totalResourceProfile) {
+            final ResourceProfile totalResourceProfile,
+            final String nodeId) {
         this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
         this.resourceId = checkNotNull(resourceId);
         this.dataPort = dataPort;
@@ -72,6 +82,7 @@ public class TaskExecutorRegistration implements Serializable {
         this.memoryConfiguration = checkNotNull(memoryConfiguration);
         this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);
         this.totalResourceProfile = checkNotNull(totalResourceProfile);
+        this.nodeId = checkNotNull(nodeId);
     }
 
     public String getTaskExecutorAddress() {
@@ -105,4 +116,8 @@ public class TaskExecutorRegistration implements Serializable {
     public ResourceProfile getTotalResourceProfile() {
         return totalResourceProfile;
     }
+
+    public String getNodeId() {
+        return nodeId;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index f5e77b4a0f5..79f402e4cd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -43,6 +43,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>
 
     private final ResourceProfile defaultSlotResourceProfile;
 
+    private final String nodeId;
+
     public WorkerRegistration(
             TaskExecutorGateway taskExecutorGateway,
             WorkerType worker,
@@ -51,7 +53,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>
             HardwareDescription hardwareDescription,
             TaskExecutorMemoryConfiguration memoryConfiguration,
             ResourceProfile totalResourceProfile,
-            ResourceProfile defaultSlotResourceProfile) {
+            ResourceProfile defaultSlotResourceProfile,
+            String nodeId) {
 
         super(worker.getResourceID(), taskExecutorGateway);
 
@@ -62,6 +65,7 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>
         this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration);
         this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile);
         this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+        this.nodeId = Preconditions.checkNotNull(nodeId);
     }
 
     public WorkerType getWorker() {
@@ -91,4 +95,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>
     public ResourceProfile getTotalResourceProfile() {
         return totalResourceProfile;
     }
+
+    public String getNodeId() {
+        return nodeId;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 55bc617f5b5..a7df344756c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1350,7 +1350,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
                         hardwareDescription,
                         memoryConfiguration,
                         taskManagerConfiguration.getDefaultSlotResourceProfile(),
-                        taskManagerConfiguration.getTotalResourceProfile());
+                        taskManagerConfiguration.getTotalResourceProfile(),
+                        unresolvedTaskManagerLocation.getNodeId());
 
         resourceManagerConnection =
                 new TaskExecutorToResourceManagerConnection(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index c527fdeca31..d66dfb85067 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -195,7 +195,8 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.ZERO,
-                        ResourceProfile.ZERO);
+                        ResourceProfile.ZERO,
+                        taskExecutorAddress);
         final CompletableFuture<RegistrationResponse> registrationFuture =
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index f50ecc366f1..9f14d33b1cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -238,7 +238,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                             new TaskExecutorMemoryConfiguration(
                                     1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                             DEFAULT_SLOT_PROFILE,
-                            DEFAULT_SLOT_PROFILE);
+                            DEFAULT_SLOT_PROFILE,
+                            taskExecutorGateway.getAddress());
 
             CompletableFuture<RegistrationResponse> firstFuture =
                     rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
@@ -307,7 +308,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         DEFAULT_SLOT_PROFILE,
-                        DEFAULT_SLOT_PROFILE.multiply(numberSlots));
+                        DEFAULT_SLOT_PROFILE.multiply(numberSlots),
+                        taskExecutorGateway.getAddress());
         final RegistrationResponse registrationResponse =
                 rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
         assertThat(registrationResponse, instanceOf(TaskExecutorRegistrationSuccess.class));
@@ -384,7 +386,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         DEFAULT_SLOT_PROFILE,
-                        DEFAULT_SLOT_PROFILE),
+                        DEFAULT_SLOT_PROFILE,
+                        taskExecutorAddress),
                 TIMEOUT);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 0564bbc373d..d8a93a948cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -233,7 +233,8 @@ public class ResourceManagerTest extends TestLogger {
                         new TaskExecutorMemoryConfiguration(
                                 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
                         ResourceProfile.ZERO,
-                        ResourceProfile.ZERO);
+                        ResourceProfile.ZERO,
+                        taskExecutorAddress);
         final CompletableFuture<RegistrationResponse> registrationFuture =
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 6129f9a34b2..08ba16d314c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -949,7 +949,8 @@ public class ActiveResourceManagerTest extends TestLogger {
                             new HardwareDescription(1, 2L, 3L, 4L),
                             TESTING_CONFIG,
                             ResourceProfile.ZERO,
-                            ResourceProfile.ZERO);
+                            ResourceProfile.ZERO,
+                            resourceID.toString());
 
             return resourceManager
                     .getSelfGateway(ResourceManagerGateway.class)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
index 6e4616f7a09..7f153a175e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
@@ -65,6 +65,8 @@ class TaskExecutorToResourceManagerConnectionTest {
 
     private static final int TASK_MANAGER_JMX_PORT = 23456;
 
+    private static final String TASK_MANAGER_NODE_ID = "local";
+
     private static final HardwareDescription TASK_MANAGER_HARDWARE_DESCRIPTION =
             HardwareDescription.extractFromSystem(Long.MAX_VALUE);
 
@@ -93,6 +95,7 @@ class TaskExecutorToResourceManagerConnectionTest {
                             taskExecutorRegistration.getHardwareDescription();
                     final TaskExecutorMemoryConfiguration actualMemoryConfiguration =
                             taskExecutorRegistration.getMemoryConfiguration();
+                    final String nodeID = taskExecutorRegistration.getNodeId();
 
                     assertThat(actualAddress).isEqualTo(TASK_MANAGER_ADDRESS);
                     assertThat(actualResourceId).isEqualTo(TASK_MANAGER_RESOURCE_ID);
@@ -101,6 +104,7 @@ class TaskExecutorToResourceManagerConnectionTest {
                             .isEqualTo(TASK_MANAGER_HARDWARE_DESCRIPTION);
                     assertThat(actualMemoryConfiguration)
                             .isEqualTo(TASK_MANAGER_MEMORY_CONFIGURATION);
+                    assertThat(nodeID).isEqualTo(TASK_MANAGER_NODE_ID);
 
                     return CompletableFuture.completedFuture(successfulRegistration());
                 });
@@ -135,7 +139,8 @@ class TaskExecutorToResourceManagerConnectionTest {
                         TASK_MANAGER_HARDWARE_DESCRIPTION,
                         TASK_MANAGER_MEMORY_CONFIGURATION,
                         ResourceProfile.ZERO,
-                        ResourceProfile.ZERO);
+                        ResourceProfile.ZERO,
+                        TASK_MANAGER_NODE_ID);
         return new TaskExecutorToResourceManagerConnection(
                 LOGGER,
                 rpcService,


[flink] 02/03: [FLINK-28142][runtime] Enrich TaskManagerLocation with node information

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51c139b601806cc4f4272fb678f4d1bed4cf06ab
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Tue Jun 28 22:53:50 2022 +0800

    [FLINK-28142][runtime] Enrich TaskManagerLocation with node information
---
 .../configuration/TaskManagerOptionsInternal.java  | 11 +++++
 .../decorators/InitTaskManagerDecorator.java       | 15 ++++++-
 .../taskmanager/KubernetesTaskExecutorRunner.java  | 29 ++++++++++++-
 .../apache/flink/kubernetes/utils/Constants.java   |  4 ++
 .../decorators/InitTaskManagerDecoratorTest.java   | 24 +++++++++--
 .../factory/KubernetesTaskManagerFactoryTest.java  |  2 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  3 +-
 .../TaskManagerServicesConfiguration.java          | 22 +++++++++-
 .../runtime/taskmanager/TaskManagerLocation.java   | 49 ++++++++++++++++++----
 .../taskmanager/UnresolvedTaskManagerLocation.java | 11 ++++-
 .../TaskManagerRunnerConfigurationTest.java        | 35 ++++++++++++++++
 .../LocalUnresolvedTaskManagerLocation.java        |  2 +-
 .../taskmanager/TaskManagerLocationTest.java       | 36 +++++++++++++++-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |  3 ++
 .../flink/yarn/YarnTaskExecutorRunnerTest.java     | 15 +++++++
 16 files changed, 241 insertions(+), 22 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
index 314bb3abd0a..9a0b88202e6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
@@ -32,4 +32,15 @@ public class TaskManagerOptionsInternal {
                     .noDefaultValue()
                     .withDescription(
                             "**DO NOT USE** The metadata of TaskManager's ResourceID to be used for logging.");
+
+    /**
+     * The ID of the node where the TaskManager is located on. In Yarn and Native Kubernetes mode,
+     * this option will be set by resource manager when launch a container for the task executor. In
+     * other modes, this option will not be set. This option is only used internally.
+     */
+    public static final ConfigOption<String> TASK_MANAGER_NODE_ID =
+            key("internal.taskmanager.node-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("ID of the node where the TaskManager is located on.");
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index 38ea8315063..09eac71eb3e 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -31,6 +31,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder;
 import io.fabric8.kubernetes.api.model.ContainerPort;
 import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
 import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 
@@ -39,8 +40,11 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
 import static org.apache.flink.kubernetes.utils.Constants.DNS_PLOICY_DEFAULT;
 import static org.apache.flink.kubernetes.utils.Constants.DNS_PLOICY_HOSTNETWORK;
+import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_NODE_ID;
+import static org.apache.flink.kubernetes.utils.Constants.POD_NODE_ID_FIELD_PATH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** An initializer for the TaskManager {@link org.apache.flink.kubernetes.kubeclient.FlinkPod}. */
@@ -149,7 +153,16 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
                 .withResources(resourceRequirements);
 
         // Merge fields
-        mainContainerBuilder.addAllToPorts(getContainerPorts()).addAllToEnv(getCustomizedEnvs());
+        mainContainerBuilder
+                .addAllToPorts(getContainerPorts())
+                .addAllToEnv(getCustomizedEnvs())
+                .addNewEnv()
+                .withName(ENV_FLINK_POD_NODE_ID)
+                .withValueFrom(
+                        new EnvVarSourceBuilder()
+                                .withNewFieldRef(API_VERSION, POD_NODE_ID_FIELD_PATH)
+                                .build())
+                .endEnv();
         getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
 
         return mainContainerBuilder.build();
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
index cf95c70db4f..049d1ad2b50 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.kubernetes.taskmanager;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** This class is the executable entry point for running a TaskExecutor in a Kubernetes pod. */
 public class KubernetesTaskExecutorRunner {
 
@@ -36,6 +43,26 @@ public class KubernetesTaskExecutorRunner {
         SignalHandler.register(LOG);
         JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-        TaskManagerRunner.runTaskManagerProcessSecurely(args);
+        runTaskManagerSecurely(args);
+    }
+
+    private static void runTaskManagerSecurely(String[] args) {
+        Configuration configuration = null;
+
+        try {
+            configuration = TaskManagerRunner.loadConfiguration(args);
+            final String nodeId = System.getenv().get(Constants.ENV_FLINK_POD_NODE_ID);
+            Preconditions.checkState(
+                    nodeId != null,
+                    "The environment variable %s is not set, "
+                            + "which is used to identify the node where the task manager is located.",
+                    Constants.ENV_FLINK_POD_NODE_ID);
+            configuration.setString(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, nodeId);
+        } catch (FlinkParseException fpe) {
+            LOG.error("Could not load the configuration.", fpe);
+            System.exit(TaskManagerRunner.FAILURE_EXIT_CODE);
+        }
+
+        TaskManagerRunner.runTaskManagerProcessSecurely(checkNotNull(configuration));
     }
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index bde37d933cb..7c1828c4562 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -85,6 +85,10 @@ public class Constants {
 
     public static final String POD_IP_FIELD_PATH = "status.podIP";
 
+    public static final String ENV_FLINK_POD_NODE_ID = "_POD_NODE_ID";
+
+    public static final String POD_NODE_ID_FIELD_PATH = "spec.nodeName";
+
     public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45;
 
     public static final String RESTART_POLICY_OF_NEVER = "Never";
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index d4acbb4e30e..f7040e440fa 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -179,13 +179,29 @@ class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase {
     @Test
     void testMainContainerEnv() {
         final Map<String, String> expectedEnvVars = new HashMap<>(customizedEnvs);
-
-        final Map<String, String> resultEnvVars =
-                this.resultMainContainer.getEnv().stream()
-                        .collect(Collectors.toMap(EnvVar::getName, EnvVar::getValue));
+        final Map<String, String> resultEnvVars = new HashMap<>();
+        this.resultMainContainer
+                .getEnv()
+                .forEach(envVar -> resultEnvVars.put(envVar.getName(), envVar.getValue()));
         expectedEnvVars.forEach((k, v) -> assertThat(resultEnvVars.get(k)).isEqualTo(v));
     }
 
+    @Test
+    void testNodeIdEnv() {
+        assertThat(this.resultMainContainer.getEnv())
+                .anyMatch(
+                        envVar ->
+                                envVar.getName().equals(Constants.ENV_FLINK_POD_NODE_ID)
+                                        && envVar.getValueFrom()
+                                                .getFieldRef()
+                                                .getApiVersion()
+                                                .equals(Constants.API_VERSION)
+                                        && envVar.getValueFrom()
+                                                .getFieldRef()
+                                                .getFieldPath()
+                                                .equals(Constants.POD_NODE_ID_FIELD_PATH));
+    }
+
     @Test
     void testPodName() {
         assertThat(this.resultPod.getMetadata().getName()).isEqualTo(POD_NAME);
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
index 01a059fc2c2..f49b8ad996f 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
@@ -86,7 +86,7 @@ class KubernetesTaskManagerFactoryTest extends KubernetesTaskManagerTestBase {
         assertThat(resultMainContainer.getImagePullPolicy())
                 .isEqualTo(CONTAINER_IMAGE_PULL_POLICY.name());
 
-        assertThat(resultMainContainer.getEnv()).hasSize(4);
+        assertThat(resultMainContainer.getEnv()).hasSize(5);
         assertThat(
                         resultMainContainer.getEnv().stream()
                                 .anyMatch(envVar -> envVar.getName().equals("key1")))
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0762b1558a2..0c2b82ff2b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -113,7 +113,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
     private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
 
     private static final int SUCCESS_EXIT_CODE = 0;
-    @VisibleForTesting static final int FAILURE_EXIT_CODE = 1;
+    @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1;
 
     private final Thread shutdownHook;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index a02a3b34df7..78c1e282823 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -304,7 +304,8 @@ public class TaskManagerServices {
                         // iff the external data port is not explicitly defined
                         taskManagerServicesConfiguration.getExternalDataPort() > 0
                                 ? taskManagerServicesConfiguration.getExternalDataPort()
-                                : listeningDataPort);
+                                : listeningDataPort,
+                        taskManagerServicesConfiguration.getNodeId());
 
         final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 5f0116b6204..301997a7c7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
@@ -59,6 +60,8 @@ public class TaskManagerServicesConfiguration {
 
     private final String externalAddress;
 
+    private final String nodeId;
+
     private final InetAddress bindAddress;
 
     private final int externalDataPort;
@@ -110,7 +113,8 @@ public class TaskManagerServicesConfiguration {
             Optional<Time> systemResourceMetricsProbingInterval,
             FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
             String[] alwaysParentFirstLoaderPatterns,
-            int numIoThreads) {
+            int numIoThreads,
+            String nodeId) {
         this.configuration = checkNotNull(configuration);
         this.resourceID = checkNotNull(resourceID);
 
@@ -139,6 +143,8 @@ public class TaskManagerServicesConfiguration {
 
         this.systemResourceMetricsProbingInterval =
                 checkNotNull(systemResourceMetricsProbingInterval);
+
+        this.nodeId = checkNotNull(nodeId);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -230,6 +236,10 @@ public class TaskManagerServicesConfiguration {
         return numIoThreads;
     }
 
+    public String getNodeId() {
+        return nodeId;
+    }
+
     // --------------------------------------------------------------------------------------------
     //  Parsing of Flink configuration
     // --------------------------------------------------------------------------------------------
@@ -302,6 +312,13 @@ public class TaskManagerServicesConfiguration {
 
         final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
 
+        // If TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID is not set, use the external address
+        // as the node id.
+        final String nodeId =
+                configuration
+                        .getOptional(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID)
+                        .orElse(externalAddress);
+
         return new TaskManagerServicesConfiguration(
                 configuration,
                 resourceID,
@@ -321,6 +338,7 @@ public class TaskManagerServicesConfiguration {
                 ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
                 FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
                 alwaysParentFirstLoaderPatterns,
-                numIoThreads);
+                numIoThreads,
+                nodeId);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index ce33c8ba7db..36903248548 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.util.NetUtils;
 
@@ -68,6 +69,14 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
      */
     private String stringRepresentation;
 
+    /**
+     * ID of the node where the TaskManager is located on. In Yarn and Native Kubernetes mode, this
+     * value will be set by resource manager when launch this TaskManager(via the config option
+     * {@link TaskManagerOptionsInternal#TASK_MANAGER_NODE_ID}). In other modes, this value will be
+     * the external address of the TaskManager.
+     */
+    private final String nodeId;
+
     /**
      * Constructs a new instance connection info object. The constructor will attempt to retrieve
      * the instance's host name and domain name through the operating system's lookup mechanisms.
@@ -76,13 +85,15 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
      * @param dataPort the port instance's task manager expects to receive transfer envelopes on
      * @param hostNameSupplier the supplier for obtaining fully-qualified domain name and pure
      *     hostname of the task manager
+     * @param nodeId the ID of node where the task manager is located on.
      */
     @VisibleForTesting
     public TaskManagerLocation(
             ResourceID resourceID,
             InetAddress inetAddress,
             int dataPort,
-            HostNameSupplier hostNameSupplier) {
+            HostNameSupplier hostNameSupplier,
+            String nodeId) {
         // -1 indicates a local instance connection info
         checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)");
 
@@ -90,6 +101,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
         this.inetAddress = checkNotNull(inetAddress);
         this.dataPort = dataPort;
         this.hostNameSupplier = checkNotNull(hostNameSupplier);
+        this.nodeId = checkNotNull(nodeId);
     }
 
     /**
@@ -101,7 +113,12 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
      */
     @VisibleForTesting
     public TaskManagerLocation(ResourceID resourceID, InetAddress inetAddress, int dataPort) {
-        this(resourceID, inetAddress, dataPort, new DefaultHostNameSupplier(inetAddress));
+        this(
+                resourceID,
+                inetAddress,
+                dataPort,
+                new DefaultHostNameSupplier(inetAddress),
+                getHostName(inetAddress));
     }
 
     public static TaskManagerLocation fromUnresolvedLocation(
@@ -117,13 +134,15 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
                         unresolvedLocation.getResourceID(),
                         inetAddress,
                         unresolvedLocation.getDataPort(),
-                        new DefaultHostNameSupplier(inetAddress));
+                        new DefaultHostNameSupplier(inetAddress),
+                        unresolvedLocation.getNodeId());
             case USE_IP_ONLY:
                 return new TaskManagerLocation(
                         unresolvedLocation.getResourceID(),
                         inetAddress,
                         unresolvedLocation.getDataPort(),
-                        new IpOnlyHostNameSupplier(inetAddress));
+                        new IpOnlyHostNameSupplier(inetAddress),
+                        unresolvedLocation.getNodeId());
             default:
                 throw new UnsupportedOperationException("Unsupported resolution mode provided.");
         }
@@ -196,6 +215,15 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
         return hostNameSupplier.getHostName();
     }
 
+    /**
+     * Return the ID of node where the task manager is located on.
+     *
+     * @return The ID of node where the task manager is located on.
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+
     // --------------------------------------------------------------------------------------------
     // Utilities
     // --------------------------------------------------------------------------------------------
@@ -264,7 +292,8 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
             TaskManagerLocation that = (TaskManagerLocation) obj;
             return this.resourceID.equals(that.resourceID)
                     && this.inetAddress.equals(that.inetAddress)
-                    && this.dataPort == that.dataPort;
+                    && this.dataPort == that.dataPort
+                    && this.nodeId.equals(that.nodeId);
         } else {
             return false;
         }
@@ -272,7 +301,10 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
 
     @Override
     public int hashCode() {
-        return resourceID.hashCode() + 17 * inetAddress.hashCode() + 129 * dataPort;
+        return resourceID.hashCode()
+                + 17 * inetAddress.hashCode()
+                + 129 * dataPort
+                + 257 * nodeId.hashCode();
     }
 
     @Override
@@ -309,9 +341,10 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
             return -1;
         } else if (this.dataPort > o.dataPort) {
             return 1;
-        } else {
-            return 0;
         }
+
+        // finally, decided based on node id
+        return this.nodeId.compareTo(o.nodeId);
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
index 28bd9155e2e..249ccef1ef2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
@@ -36,15 +36,20 @@ public class UnresolvedTaskManagerLocation implements Serializable {
     private final ResourceID resourceID;
     private final String externalAddress;
     private final int dataPort;
+    private final String nodeId;
 
     public UnresolvedTaskManagerLocation(
-            final ResourceID resourceID, final String externalAddress, final int dataPort) {
+            final ResourceID resourceID,
+            final String externalAddress,
+            final int dataPort,
+            final String nodeId) {
         // -1 indicates a local instance connection info
         checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)");
 
         this.resourceID = checkNotNull(resourceID);
         this.externalAddress = checkNotNull(externalAddress);
         this.dataPort = dataPort;
+        this.nodeId = checkNotNull(nodeId);
     }
 
     public ResourceID getResourceID() {
@@ -58,4 +63,8 @@ public class UnresolvedTaskManagerLocation implements Serializable {
     public int getDataPort() {
         return dataPort;
     }
+
+    public String getNodeId() {
+        return nodeId;
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index 0f021a08c0d..f8b3382947d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -24,9 +24,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
@@ -47,6 +50,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
 import java.nio.file.Files;
@@ -214,6 +218,37 @@ class TaskManagerRunnerConfigurationTest {
         assertThat(jmPort).isEqualTo(configuration.getInteger(JobManagerOptions.PORT));
     }
 
+    @Test
+    void testNodeIdShouldBeConfiguredValueIfExplicitlySet() throws Exception {
+        String nodeId = "node1";
+        Configuration configuration = new Configuration();
+        configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, nodeId);
+        TaskManagerServicesConfiguration servicesConfiguration =
+                createTaskManagerServiceConfiguration(configuration);
+        assertThat(servicesConfiguration.getNodeId()).isEqualTo(nodeId);
+    }
+
+    @Test
+    void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws Exception {
+        TaskManagerServicesConfiguration servicesConfiguration =
+                createTaskManagerServiceConfiguration(new Configuration());
+        assertThat(servicesConfiguration.getNodeId())
+                .isEqualTo(InetAddress.getLocalHost().getHostName());
+    }
+
+    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(
+            Configuration config) throws Exception {
+        return TaskManagerServicesConfiguration.fromConfiguration(
+                config,
+                ResourceID.generate(),
+                InetAddress.getLocalHost().getHostName(),
+                true,
+                TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config),
+                WorkingDirectory.create(
+                        Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString())
+                                .toFile()));
+    }
+
     private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname(
             final String taskmanagerHost) {
         final Configuration config = new Configuration();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
index 201de612669..365974677ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
@@ -25,6 +25,6 @@ public class LocalUnresolvedTaskManagerLocation extends UnresolvedTaskManagerLoc
     private static final long serialVersionUID = 1L;
 
     public LocalUnresolvedTaskManagerLocation() {
-        super(ResourceID.generate(), "localhost", 42);
+        super(ResourceID.generate(), "localhost", 42, "localhost");
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
index 3c9fa646e1e..de3e1c7e69f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -91,6 +91,39 @@ class TaskManagerLocationTest {
         }
     }
 
+    @Test
+    void testEqualsHashAndCompareToWithDifferentNodeId() throws Exception {
+        ResourceID resourceID = ResourceID.generate();
+        InetAddress inetAddress = InetAddress.getByName("1.2.3.4");
+        TaskManagerLocation.HostNameSupplier hostNameSupplier =
+                new TaskManagerLocation.DefaultHostNameSupplier(inetAddress);
+        String nodeId1 = "node1";
+        String nodeId2 = "node2";
+
+        // one == three != two
+        TaskManagerLocation one =
+                new TaskManagerLocation(resourceID, inetAddress, 19871, hostNameSupplier, nodeId1);
+        TaskManagerLocation two =
+                new TaskManagerLocation(resourceID, inetAddress, 19871, hostNameSupplier, nodeId2);
+        TaskManagerLocation three =
+                new TaskManagerLocation(resourceID, inetAddress, 19871, hostNameSupplier, nodeId1);
+
+        assertThat(one).isEqualTo(three);
+        assertThat(one).isNotEqualTo(two);
+        assertThat(two).isNotEqualTo(three);
+
+        assertThat(one.hashCode()).isEqualTo(three.hashCode());
+        assertThat(one.hashCode()).isNotEqualTo(two.hashCode());
+        assertThat(two.hashCode()).isNotEqualTo(three.hashCode());
+
+        assertThat(one.compareTo(three)).isEqualTo(0);
+        assertThat(one.compareTo(two)).isNotEqualTo(0);
+        assertThat(two.compareTo(three)).isNotEqualTo(0);
+
+        int val = one.compareTo(two);
+        assertThat(two.compareTo(one)).isEqualTo(-val);
+    }
+
     @Test
     void testSerialization() {
         try {
@@ -211,7 +244,8 @@ class TaskManagerLocationTest {
                         ResourceID.generate(),
                         address,
                         19871,
-                        new TaskManagerLocation.IpOnlyHostNameSupplier(address));
+                        new TaskManagerLocation.IpOnlyHostNameSupplier(address),
+                        address.getHostAddress());
 
         assertThat("worker10").isNotEqualTo(info.getHostname());
         assertThat("worker10").isNotEqualTo(info.getFQDNHostname());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 6b0a629bf97..f39010a5d9f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -138,6 +139,8 @@ public class YarnTaskExecutorRunner {
                 variables.get(YarnResourceManagerDriver.ENV_FLINK_NODE_ID);
         if (taskExecutorHostname != null) {
             configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
+            configuration.setString(
+                    TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, taskExecutorHostname);
         }
     }
 }
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
index 32043040c29..15a7149079f 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.security.modules.HadoopModule;
@@ -30,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -125,4 +127,17 @@ public class YarnTaskExecutorRunnerTest {
         assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL))
                 .isEqualTo("testuser1@domain");
     }
+
+    @Test
+    void testTaskManagerNodeIdConfiguration() throws Exception {
+        final String resourceDirPath =
+                Paths.get("src", "test", "resources").toAbsolutePath().toString();
+        Configuration configuration = new Configuration();
+        YarnTaskExecutorRunner.setupAndModifyConfiguration(
+                configuration,
+                resourceDirPath,
+                Collections.singletonMap(YarnResourceManagerDriver.ENV_FLINK_NODE_ID, "test"));
+        assertThat(configuration.getString(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID))
+                .isEqualTo("test");
+    }
 }


[flink] 01/03: [hotfix][runtime][tests] Migrate TaskManagerLocationTest, TaskExecutorToResourceManagerConnectionTest and TaskManagerRunnerConfigurationTest to JUnit5

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c1e39b6aca7fc0af5ae6370f6dcb644c91881dd1
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Wed Jun 22 20:05:19 2022 +0800

    [hotfix][runtime][tests] Migrate TaskManagerLocationTest, TaskExecutorToResourceManagerConnectionTest and TaskManagerRunnerConfigurationTest to JUnit5
---
 ...askExecutorToResourceManagerConnectionTest.java |  39 ++++----
 .../TaskManagerRunnerConfigurationTest.java        | 103 +++++++++------------
 .../taskmanager/TaskManagerLocationTest.java       |  78 ++++++++--------
 3 files changed, 97 insertions(+), 123 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
index a14752fde1e..6e4616f7a09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java
@@ -32,24 +32,21 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TaskExecutorToResourceManagerConnection}. */
-public class TaskExecutorToResourceManagerConnectionTest extends TestLogger {
+class TaskExecutorToResourceManagerConnectionTest {
 
     private static final Logger LOGGER =
             LoggerFactory.getLogger(TaskExecutorToResourceManagerConnectionTest.class);
@@ -83,7 +80,7 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger {
     private CompletableFuture<Void> registrationRejectionFuture;
 
     @Test
-    public void testResourceManagerRegistration() throws Exception {
+    void testResourceManagerRegistration() throws Exception {
         final TaskExecutorToResourceManagerConnection resourceManagerRegistration =
                 createTaskExecutorToResourceManagerConnection();
 
@@ -97,13 +94,13 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger {
                     final TaskExecutorMemoryConfiguration actualMemoryConfiguration =
                             taskExecutorRegistration.getMemoryConfiguration();
 
-                    assertThat(actualAddress, is(equalTo(TASK_MANAGER_ADDRESS)));
-                    assertThat(actualResourceId, is(equalTo(TASK_MANAGER_RESOURCE_ID)));
-                    assertThat(actualDataPort, is(equalTo(TASK_MANAGER_DATA_PORT)));
-                    assertThat(
-                            actualHardwareDescription,
-                            is(equalTo(TASK_MANAGER_HARDWARE_DESCRIPTION)));
-                    assertThat(actualMemoryConfiguration, is(TASK_MANAGER_MEMORY_CONFIGURATION));
+                    assertThat(actualAddress).isEqualTo(TASK_MANAGER_ADDRESS);
+                    assertThat(actualResourceId).isEqualTo(TASK_MANAGER_RESOURCE_ID);
+                    assertThat(actualDataPort).isEqualTo(TASK_MANAGER_DATA_PORT);
+                    assertThat(actualHardwareDescription)
+                            .isEqualTo(TASK_MANAGER_HARDWARE_DESCRIPTION);
+                    assertThat(actualMemoryConfiguration)
+                            .isEqualTo(TASK_MANAGER_MEMORY_CONFIGURATION);
 
                     return CompletableFuture.completedFuture(successfulRegistration());
                 });
@@ -113,7 +110,7 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger {
     }
 
     @Test
-    public void testResourceManagerRegistrationIsRejected() {
+    void testResourceManagerRegistrationIsRejected() {
         final TaskExecutorToResourceManagerConnection resourceManagerRegistration =
                 createTaskExecutorToResourceManagerConnection();
 
@@ -157,8 +154,8 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger {
                 new ClusterInformation("blobServerHost", 55555));
     }
 
-    @Before
-    public void setUp() {
+    @BeforeEach
+    void setUp() {
         rpcService = new TestingRpcService();
 
         testingResourceManagerGateway = new TestingResourceManagerGateway();
@@ -168,8 +165,8 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger {
         registrationRejectionFuture = new CompletableFuture<>();
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() throws Exception {
         rpcService.stopService().get(TEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index 77a8ab234ca..0f021a08c0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -34,14 +34,11 @@ import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.opentest4j.TestAbortedException;
 import sun.net.util.IPAddressUtil;
 
 import javax.annotation.Nullable;
@@ -52,19 +49,14 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.ServerSocket;
 import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isEmptyOrNullString;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeNoException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Validates that the TaskManagerRunner startup properly obeys the configuration values.
@@ -74,17 +66,16 @@ import static org.junit.Assume.assumeNoException;
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerRunnerConfigurationTest extends TestLogger {
+class TaskManagerRunnerConfigurationTest {
 
     private static final RpcSystem RPC_SYSTEM = RpcSystem.load();
 
     private static final int TEST_TIMEOUT_SECONDS = 10;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private Path temporaryFolder;
 
     @Test
-    public void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname()
-            throws Exception {
+    void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname() throws Exception {
         final String taskmanagerHost = "testhostname";
         final Configuration config =
                 createFlinkConfigWithPredefinedTaskManagerHostname(taskmanagerHost);
@@ -97,8 +88,8 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
                     TaskManagerRunner.createRpcService(
                             config, highAvailabilityServices, RPC_SYSTEM);
 
-            assertThat(taskManagerRpcService.getPort(), is(greaterThanOrEqualTo(0)));
-            assertThat(taskManagerRpcService.getAddress(), is(equalTo(taskmanagerHost)));
+            assertThat(taskManagerRpcService.getPort()).isGreaterThanOrEqualTo(0);
+            assertThat(taskManagerRpcService.getAddress()).isEqualTo(taskmanagerHost);
         } finally {
             maybeCloseRpcService(taskManagerRpcService);
             highAvailabilityServices.closeAndCleanupAllData();
@@ -106,7 +97,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
     }
 
     @Test
-    public void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception {
+    void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception {
         final Configuration config = createFlinkConfigWithHostBindPolicy(HostBindPolicy.NAME);
         final HighAvailabilityServices highAvailabilityServices =
                 createHighAvailabilityServices(config);
@@ -116,7 +107,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
             taskManagerRpcService =
                     TaskManagerRunner.createRpcService(
                             config, highAvailabilityServices, RPC_SYSTEM);
-            assertThat(taskManagerRpcService.getAddress(), not(isEmptyOrNullString()));
+            assertThat(taskManagerRpcService.getAddress()).isNotNull().isNotEmpty();
         } finally {
             maybeCloseRpcService(taskManagerRpcService);
             highAvailabilityServices.closeAndCleanupAllData();
@@ -124,9 +115,8 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
     }
 
     @Test
-    public void
-            testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager()
-                    throws Exception {
+    void testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager()
+            throws Exception {
         final ServerSocket testJobManagerSocket = openServerSocket();
         final Configuration config =
                 createFlinkConfigWithJobManagerPort(testJobManagerSocket.getLocalPort());
@@ -138,7 +128,11 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
             taskManagerRpcService =
                     TaskManagerRunner.createRpcService(
                             config, highAvailabilityServices, RPC_SYSTEM);
-            assertThat(taskManagerRpcService.getAddress(), is(ipAddress()));
+            assertThat(taskManagerRpcService.getAddress())
+                    .matches(
+                            value ->
+                                    (IPAddressUtil.isIPv4LiteralAddress(value)
+                                            || IPAddressUtil.isIPv6LiteralAddress(value)));
         } finally {
             maybeCloseRpcService(taskManagerRpcService);
             highAvailabilityServices.closeAndCleanupAllData();
@@ -147,8 +141,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
     }
 
     @Test
-    public void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid()
-            throws Exception {
+    void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid() throws Exception {
         final Configuration config =
                 new Configuration(
                         createFlinkConfigWithPredefinedTaskManagerHostname("example.org"));
@@ -158,19 +151,23 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
                 createHighAvailabilityServices(config);
 
         try {
-            TaskManagerRunner.createRpcService(config, highAvailabilityServices, RPC_SYSTEM);
-            fail("Should fail because -1 is not a valid port range");
-        } catch (final IllegalArgumentException e) {
-            assertThat(e.getMessage(), containsString("Invalid port range definition: -1"));
+            assertThatThrownBy(
+                            () ->
+                                    TaskManagerRunner.createRpcService(
+                                            config, highAvailabilityServices, RPC_SYSTEM))
+                    .isInstanceOf(IllegalArgumentException.class)
+                    .hasMessage("Invalid port range definition: -1");
         } finally {
             highAvailabilityServices.closeAndCleanupAllData();
         }
     }
 
     @Test
-    public void testDefaultFsParameterLoading() throws Exception {
+    void testDefaultFsParameterLoading() throws Exception {
         try {
-            final File tmpDir = temporaryFolder.newFolder();
+            final File tmpDir =
+                    Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString())
+                            .toFile();
             final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
 
             final URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null);
@@ -183,7 +180,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
             Configuration configuration = TaskManagerRunner.loadConfiguration(args);
             FileSystem.initialize(configuration);
 
-            assertEquals(defaultFS, FileSystem.getDefaultFsUri());
+            assertThat(defaultFS).isEqualTo(FileSystem.getDefaultFsUri());
         } finally {
             // reset FS settings
             FileSystem.initialize(new Configuration());
@@ -191,8 +188,9 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
     }
 
     @Test
-    public void testLoadDynamicalProperties() throws IOException, FlinkParseException {
-        final File tmpDir = temporaryFolder.newFolder();
+    void testLoadDynamicalProperties() throws IOException, FlinkParseException {
+        final File tmpDir =
+                Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()).toFile();
         final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
         final PrintWriter pw1 = new PrintWriter(confFile);
         final long managedMemory = 1024 * 1024 * 256;
@@ -210,11 +208,10 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
                     "-D" + JobManagerOptions.PORT.key() + "=" + jmPort
                 };
         Configuration configuration = TaskManagerRunner.loadConfiguration(args);
-        assertEquals(
-                MemorySize.parse(managedMemory + "b"),
-                configuration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE));
-        assertEquals(jmHost, configuration.get(JobManagerOptions.ADDRESS));
-        assertEquals(jmPort, configuration.getInteger(JobManagerOptions.PORT));
+        assertThat(MemorySize.parse(managedMemory + "b"))
+                .isEqualTo(configuration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE));
+        assertThat(jmHost).isEqualTo(configuration.get(JobManagerOptions.ADDRESS));
+        assertThat(jmPort).isEqualTo(configuration.getInteger(JobManagerOptions.PORT));
     }
 
     private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname(
@@ -255,8 +252,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
         try {
             return new ServerSocket(0);
         } catch (IOException e) {
-            assumeNoException("Skip test because could not open a server socket", e);
-            throw new RuntimeException("satisfy compiler");
+            throw new TestAbortedException("Skip test because could not open a server socket");
         }
     }
 
@@ -266,19 +262,4 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger {
             rpcService.stopService().get(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         }
     }
-
-    private static TypeSafeMatcher<String> ipAddress() {
-        return new TypeSafeMatcher<String>() {
-            @Override
-            protected boolean matchesSafely(String value) {
-                return IPAddressUtil.isIPv4LiteralAddress(value)
-                        || IPAddressUtil.isIPv6LiteralAddress(value);
-            }
-
-            @Override
-            public void describeTo(Description description) {
-                description.appendText("Is an ip address.");
-            }
-        };
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
index 4d7350b2e73..3c9fa646e1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -21,16 +21,12 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.util.InstantiationUtil;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -38,10 +34,10 @@ import static org.mockito.Mockito.when;
  * Tests for the TaskManagerLocation, which identifies the location and connection information of a
  * TaskManager.
  */
-public class TaskManagerLocationTest {
+class TaskManagerLocationTest {
 
     @Test
-    public void testEqualsHashAndCompareTo() {
+    void testEqualsHashAndCompareTo() {
         try {
             ResourceID resourceID1 = new ResourceID("a");
             ResourceID resourceID2 = new ResourceID("b");
@@ -72,22 +68,22 @@ public class TaskManagerLocationTest {
             TaskManagerLocation three = new TaskManagerLocation(resourceID3, address3, 10871);
             TaskManagerLocation four = new TaskManagerLocation(resourceID1, address1, 19871);
 
-            assertTrue(one.equals(four));
-            assertTrue(!one.equals(two));
-            assertTrue(!one.equals(three));
-            assertTrue(!two.equals(three));
-            assertTrue(!three.equals(four));
+            assertThat(one).isEqualTo(four);
+            assertThat(one).isNotEqualTo(two);
+            assertThat(one).isNotEqualTo(three);
+            assertThat(two).isNotEqualTo(three);
+            assertThat(three).isNotEqualTo(four);
 
-            assertTrue(one.compareTo(four) == 0);
-            assertTrue(four.compareTo(one) == 0);
-            assertTrue(one.compareTo(two) != 0);
-            assertTrue(one.compareTo(three) != 0);
-            assertTrue(two.compareTo(three) != 0);
-            assertTrue(three.compareTo(four) != 0);
+            assertThat(one.compareTo(four)).isEqualTo(0);
+            assertThat(four.compareTo(one)).isEqualTo(0);
+            assertThat(one.compareTo(two)).isNotEqualTo(0);
+            assertThat(one.compareTo(three)).isNotEqualTo(0);
+            assertThat(two.compareTo(three)).isNotEqualTo(0);
+            assertThat(three.compareTo(four)).isNotEqualTo(0);
 
             {
                 int val = one.compareTo(two);
-                assertTrue(two.compareTo(one) == -val);
+                assertThat(two.compareTo(one)).isEqualTo(-val);
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -96,7 +92,7 @@ public class TaskManagerLocationTest {
     }
 
     @Test
-    public void testSerialization() {
+    void testSerialization() {
         try {
             // without resolved hostname
             {
@@ -105,7 +101,7 @@ public class TaskManagerLocationTest {
                                 ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888);
 
                 TaskManagerLocation serCopy = InstantiationUtil.clone(original);
-                assertEquals(original, serCopy);
+                assertThat(original).isEqualTo(serCopy);
             }
 
             // with resolved hostname
@@ -116,7 +112,7 @@ public class TaskManagerLocationTest {
                 original.getFQDNHostname();
 
                 TaskManagerLocation serCopy = InstantiationUtil.clone(original);
-                assertEquals(original, serCopy);
+                assertThat(original).isEqualTo(serCopy);
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -125,17 +121,17 @@ public class TaskManagerLocationTest {
     }
 
     @Test
-    public void testGetFQDNHostname() {
+    void testGetFQDNHostname() {
         try {
             TaskManagerLocation info1 =
                     new TaskManagerLocation(
                             ResourceID.generate(), InetAddress.getByName("127.0.0.1"), 19871);
-            assertNotNull(info1.getFQDNHostname());
+            assertThat(info1.getFQDNHostname()).isNotNull();
 
             TaskManagerLocation info2 =
                     new TaskManagerLocation(
                             ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888);
-            assertNotNull(info2.getFQDNHostname());
+            assertThat(info2.getFQDNHostname()).isNotNull();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -143,7 +139,7 @@ public class TaskManagerLocationTest {
     }
 
     @Test
-    public void testGetHostname0() {
+    void testGetHostname0() {
         try {
             InetAddress address = mock(InetAddress.class);
             when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
@@ -152,7 +148,7 @@ public class TaskManagerLocationTest {
 
             final TaskManagerLocation info =
                     new TaskManagerLocation(ResourceID.generate(), address, 19871);
-            Assert.assertEquals("worker2", info.getHostname());
+            assertThat("worker2").isEqualTo(info.getHostname());
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -160,7 +156,7 @@ public class TaskManagerLocationTest {
     }
 
     @Test
-    public void testGetHostname1() {
+    void testGetHostname1() {
         try {
             InetAddress address = mock(InetAddress.class);
             when(address.getCanonicalHostName()).thenReturn("worker10");
@@ -169,7 +165,7 @@ public class TaskManagerLocationTest {
 
             TaskManagerLocation info =
                     new TaskManagerLocation(ResourceID.generate(), address, 19871);
-            Assert.assertEquals("worker10", info.getHostname());
+            assertThat("worker10").isEqualTo(info.getHostname());
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -177,7 +173,7 @@ public class TaskManagerLocationTest {
     }
 
     @Test
-    public void testGetHostname2() {
+    void testGetHostname2() {
         try {
             final String addressString = "192.168.254.254";
 
@@ -192,11 +188,11 @@ public class TaskManagerLocationTest {
             TaskManagerLocation info =
                     new TaskManagerLocation(ResourceID.generate(), address, 54152);
 
-            assertNotNull(info.getFQDNHostname());
-            assertTrue(info.getFQDNHostname().equals(addressString));
+            assertThat(info.getFQDNHostname()).isNotNull();
+            assertThat(info.getFQDNHostname()).isEqualTo(addressString);
 
-            assertNotNull(info.getHostname());
-            assertTrue(info.getHostname().equals(addressString));
+            assertThat(info.getHostname()).isNotNull();
+            assertThat(info.getHostname()).isEqualTo(addressString);
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -204,7 +200,7 @@ public class TaskManagerLocationTest {
     }
 
     @Test
-    public void testNotRetrieveHostName() {
+    void testNotRetrieveHostName() {
         InetAddress address = mock(InetAddress.class);
         when(address.getCanonicalHostName()).thenReturn("worker10");
         when(address.getHostName()).thenReturn("worker10");
@@ -217,9 +213,9 @@ public class TaskManagerLocationTest {
                         19871,
                         new TaskManagerLocation.IpOnlyHostNameSupplier(address));
 
-        assertNotEquals("worker10", info.getHostname());
-        assertNotEquals("worker10", info.getFQDNHostname());
-        assertEquals("127.0.0.1", info.getHostname());
-        assertEquals("127.0.0.1", info.getFQDNHostname());
+        assertThat("worker10").isNotEqualTo(info.getHostname());
+        assertThat("worker10").isNotEqualTo(info.getFQDNHostname());
+        assertThat("127.0.0.1").isEqualTo(info.getHostname());
+        assertThat("127.0.0.1").isEqualTo(info.getFQDNHostname());
     }
 }