You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/28 10:01:54 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #20056: [FLINK-28142][runtime] Enrich TaskManagerLocation with node information

zhuzhurk commented on code in PR #20056:
URL: https://github.com/apache/flink/pull/20056#discussion_r908290749


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java:
##########
@@ -55,6 +56,10 @@ public class TaskExecutorRegistration implements Serializable {
     /** The task executor total resource profile. */
     private final ResourceProfile totalResourceProfile;
 
+    /** ID of the node where the TaskManager is located. */
+    private final String nodeId;

Review Comment:
   This change seems to serve another purpose which is different from adding `nodeId` to `TaskManagerLocation`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java:
##########
@@ -76,20 +84,41 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
      * @param dataPort the port instance's task manager expects to receive transfer envelopes on
      * @param hostNameSupplier the supplier for obtaining fully-qualified domain name and pure
      *     hostname of the task manager
+     * @param nodeId the ID of node where the task manager is located.
      */
     @VisibleForTesting
     public TaskManagerLocation(
             ResourceID resourceID,
             InetAddress inetAddress,
             int dataPort,
-            HostNameSupplier hostNameSupplier) {
+            HostNameSupplier hostNameSupplier,
+            String nodeId) {
         // -1 indicates a local instance connection info
         checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)");
 
         this.resourceID = checkNotNull(resourceID);
         this.inetAddress = checkNotNull(inetAddress);
         this.dataPort = dataPort;
         this.hostNameSupplier = checkNotNull(hostNameSupplier);
+        this.nodeId = checkNotNull(nodeId);
+    }
+
+    /**
+     * Constructs a new instance connection info object. The constructor will attempt to retrieve
+     * the instance's host name and domain name through the operating system's lookup mechanisms.
+     *
+     * @param inetAddress the network address the instance's task manager binds its sockets to
+     * @param dataPort the port instance's task manager expects to receive transfer envelopes on
+     * @param hostNameSupplier the supplier for obtaining fully-qualified domain name and pure
+     *     hostname of the task manager
+     */
+    @VisibleForTesting
+    public TaskManagerLocation(

Review Comment:
   If this method will not be used in production, I would propose to drop it and rework its current usages (given that there are not many usages).



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java:
##########
@@ -32,4 +32,15 @@ public class TaskManagerOptionsInternal {
                     .noDefaultValue()
                     .withDescription(
                             "**DO NOT USE** The metadata of TaskManager's ResourceID to be used for logging.");
+
+    /**
+     * The ID of the node where the TaskManager is located. In Yarn and Native Kubernetes mode, this
+     * option will be set by resource manager when launch a container for the task executor. In
+     * other modes, this option will not be set. This option is only used internally.
+     */
+    public static final ConfigOption<String> TASK_MANAGER_NODE_ID =
+            key("internal.taskmanager.node-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("ID of the node where the TaskManager is located.");

Review Comment:
   located -> located in



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java:
##########
@@ -178,12 +178,24 @@ void testMainContainerPorts() {
 
     @Test
     void testMainContainerEnv() {
-        final Map<String, String> expectedEnvVars = new HashMap<>(customizedEnvs);
+        final Map<String, String> envVars = new HashMap<>();
+        this.resultMainContainer
+                .getEnv()
+                .forEach(envVar -> envVars.put(envVar.getName(), envVar.getValue()));
+        this.customizedEnvs.forEach((k, v) -> assertThat(envVars.get(k)).isEqualTo(v));
 
-        final Map<String, String> resultEnvVars =
-                this.resultMainContainer.getEnv().stream()
-                        .collect(Collectors.toMap(EnvVar::getName, EnvVar::getValue));
-        expectedEnvVars.forEach((k, v) -> assertThat(resultEnvVars.get(k)).isEqualTo(v));

Review Comment:
   Why do we need to make the above changes?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java:
##########
@@ -74,17 +68,17 @@
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerRunnerConfigurationTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   This `ExtendWith` is not needed because it was added as a basic rule of the project.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java:
##########
@@ -55,6 +56,10 @@ public class TaskExecutorRegistration implements Serializable {
     /** The task executor total resource profile. */
     private final ResourceProfile totalResourceProfile;
 
+    /** ID of the node where the TaskManager is located. */
+    private final String nodeId;
+
+    @VisibleForTesting
     public TaskExecutorRegistration(

Review Comment:
   If this method will not be used in production, I would propose to drop it and rework its current usages (given that there are not many usages).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org