You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/01/24 02:40:08 UTC

[flink] branch release-1.14 updated: [FLINK-24334][k8s] Set FLINK_LOG_DIR environment for JobManager and TaskManager pod if configured via options

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new eff530f  [FLINK-24334][k8s] Set FLINK_LOG_DIR environment for JobManager and TaskManager pod if configured via options
eff530f is described below

commit eff530f2487e8be07ec0fadbeeca0a95369c59e9
Author: Mrart <mr...@gmail.com>
AuthorDate: Sun Oct 10 18:25:32 2021 +0800

    [FLINK-24334][k8s] Set FLINK_LOG_DIR environment for JobManager and TaskManager pod if configured via options
    
    This closes #18435.
---
 .../generated/kubernetes_config_configuration.html    |  4 ++--
 .../configuration/KubernetesConfigOptions.java        |  5 +++--
 .../decorators/InitJobManagerDecorator.java           |  8 ++++++++
 .../decorators/InitTaskManagerDecorator.java          |  8 ++++++++
 .../parameters/AbstractKubernetesParameters.java      |  4 ++--
 .../kubeclient/parameters/KubernetesParameters.java   |  2 +-
 .../org/apache/flink/kubernetes/utils/Constants.java  |  1 +
 .../decorators/InitJobManagerDecoratorTest.java       | 16 ++++++++++++++++
 .../decorators/InitTaskManagerDecoratorTest.java      | 19 +++++++++++++++++--
 9 files changed, 58 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 7705535..78d51ae 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -76,9 +76,9 @@
         </tr>
         <tr>
             <td><h5>kubernetes.flink.log.dir</h5></td>
-            <td style="word-wrap: break-word;">"/opt/flink/log"</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The directory that logs of jobmanager and taskmanager be saved in the pod.</td>
+            <td>The directory that logs of jobmanager and taskmanager be saved in the pod. The default value is $FLINK_HOME/log.</td>
         </tr>
         <tr>
             <td><h5>kubernetes.hadoop.conf.config-map.name</h5></td>
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index b905c8c..a5f61d5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -265,9 +265,10 @@ public class KubernetesConfigOptions {
     public static final ConfigOption<String> FLINK_LOG_DIR =
             key("kubernetes.flink.log.dir")
                     .stringType()
-                    .defaultValue("/opt/flink/log")
+                    .noDefaultValue()
                     .withDescription(
-                            "The directory that logs of jobmanager and taskmanager be saved in the pod.");
+                            "The directory that logs of jobmanager and taskmanager be saved in the pod. "
+                                    + "The default value is $FLINK_HOME/log.");
 
     public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP =
             key("kubernetes.hadoop.conf.config-map.name")
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
index cd74fb3..5b3d896 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
@@ -39,6 +39,7 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
@@ -149,6 +150,7 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
                                 .withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH)
                                 .build())
                 .endEnv();
+        getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
         return mainContainerBuilder.build();
     }
 
@@ -178,4 +180,10 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
                                         .build())
                 .collect(Collectors.toList());
     }
+
+    private Optional<EnvVar> getFlinkLogDirEnv() {
+        return kubernetesJobManagerParameters
+                .getFlinkLogDirInPod()
+                .map(logDir -> new EnvVar(Constants.ENV_FLINK_LOG_DIR, logDir, null));
+    }
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index ddc1745..69668de 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -34,6 +34,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -144,6 +145,7 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
                                 .withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
                                 .build())
                 .addAllToEnv(getCustomizedEnvs());
+        getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
 
         return mainContainerBuilder.build();
     }
@@ -153,4 +155,10 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
                 .map(kv -> new EnvVar(kv.getKey(), kv.getValue(), null))
                 .collect(Collectors.toList());
     }
+
+    private Optional<EnvVar> getFlinkLogDirEnv() {
+        return kubernetesTaskManagerParameters
+                .getFlinkLogDirInPod()
+                .map(logDir -> new EnvVar(Constants.ENV_FLINK_LOG_DIR, logDir, null));
+    }
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index 17d0177..2b51c6d 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -131,8 +131,8 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
     }
 
     @Override
-    public String getFlinkLogDirInPod() {
-        return flinkConfig.getString(KubernetesConfigOptions.FLINK_LOG_DIR);
+    public Optional<String> getFlinkLogDirInPod() {
+        return Optional.ofNullable(flinkConfig.getString(KubernetesConfigOptions.FLINK_LOG_DIR));
     }
 
     @Override
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 47f57c5..c6b3354 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -82,7 +82,7 @@ public interface KubernetesParameters {
     String getFlinkConfDirInPod();
 
     /** Directory in Pod that saves the log files. */
-    String getFlinkLogDirInPod();
+    Optional<String> getFlinkLogDirInPod();
 
     /** The docker entrypoint that starts processes in the container. */
     String getContainerEntrypoint();
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index 0f1482f..aa3bfa5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -27,6 +27,7 @@ public class Constants {
 
     public static final String CONFIG_FILE_LOGBACK_NAME = "logback-console.xml";
     public static final String CONFIG_FILE_LOG4J_NAME = "log4j-console.properties";
+    public static final String ENV_FLINK_LOG_DIR = "FLINK_LOG_DIR";
 
     public static final String MAIN_CONTAINER_NAME = "flink-main-container";
 
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
index 364765d..e21e582 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
@@ -67,6 +67,8 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
                     new Toleration("NoSchedule", "key1", "Equal", null, "value1"),
                     new Toleration("NoExecute", "key2", "Exists", 6000L, null));
 
+    private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
+
     private Pod resultPod;
     private Container resultMainContainer;
 
@@ -81,6 +83,7 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
         this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, ANNOTATIONS);
         this.flinkConfig.setString(
                 KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
+        this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
     }
 
     @Override
@@ -210,4 +213,17 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
                 this.resultPod.getSpec().getTolerations(),
                 Matchers.containsInAnyOrder(TOLERATION.toArray()));
     }
+
+    @Test
+    public void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
+        final List<EnvVar> envVars = this.resultMainContainer.getEnv();
+        assertThat(
+                envVars.stream()
+                        .anyMatch(
+                                envVar ->
+                                        envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR)
+                                                && envVar.getValue()
+                                                        .equals(USER_DEFINED_FLINK_LOG_DIR)),
+                is(true));
+    }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index 8e2c38f..546e343 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -72,6 +72,8 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
     private static final Long RESOURCE_AMOUNT = 2L;
     private static final String RESOURCE_CONFIG_KEY = "test.com/test";
 
+    private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
+
     private Pod resultPod;
     private Container resultMainContainer;
 
@@ -98,6 +100,7 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
                         RESOURCE_NAME,
                         KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX),
                 RESOURCE_CONFIG_KEY);
+        this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
     }
 
     @Override
@@ -177,8 +180,7 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
         final Map<String, String> resultEnvVars =
                 this.resultMainContainer.getEnv().stream()
                         .collect(Collectors.toMap(EnvVar::getName, EnvVar::getValue));
-
-        assertEquals(expectedEnvVars, resultEnvVars);
+        expectedEnvVars.forEach((k, v) -> assertThat(resultEnvVars.get(k), is(v)));
     }
 
     @Test
@@ -235,4 +237,17 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
                 this.resultPod.getSpec().getTolerations(),
                 Matchers.containsInAnyOrder(TOLERATION.toArray()));
     }
+
+    @Test
+    public void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
+        final List<EnvVar> envVars = this.resultMainContainer.getEnv();
+        assertThat(
+                envVars.stream()
+                        .anyMatch(
+                                envVar ->
+                                        envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR)
+                                                && envVar.getValue()
+                                                        .equals(USER_DEFINED_FLINK_LOG_DIR)),
+                is(true));
+    }
 }