You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/01/24 02:40:08 UTC
[flink] branch release-1.14 updated: [FLINK-24334][k8s] Set FLINK_LOG_DIR environment for JobManager and TaskManager pod if configured via options
This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new eff530f [FLINK-24334][k8s] Set FLINK_LOG_DIR environment for JobManager and TaskManager pod if configured via options
eff530f is described below
commit eff530f2487e8be07ec0fadbeeca0a95369c59e9
Author: Mrart <mr...@gmail.com>
AuthorDate: Sun Oct 10 18:25:32 2021 +0800
[FLINK-24334][k8s] Set FLINK_LOG_DIR environment for JobManager and TaskManager pod if configured via options
This closes #18435.
---
.../generated/kubernetes_config_configuration.html | 4 ++--
.../configuration/KubernetesConfigOptions.java | 5 +++--
.../decorators/InitJobManagerDecorator.java | 8 ++++++++
.../decorators/InitTaskManagerDecorator.java | 8 ++++++++
.../parameters/AbstractKubernetesParameters.java | 4 ++--
.../kubeclient/parameters/KubernetesParameters.java | 2 +-
.../org/apache/flink/kubernetes/utils/Constants.java | 1 +
.../decorators/InitJobManagerDecoratorTest.java | 16 ++++++++++++++++
.../decorators/InitTaskManagerDecoratorTest.java | 19 +++++++++++++++++--
9 files changed, 58 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 7705535..78d51ae 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -76,9 +76,9 @@
</tr>
<tr>
<td><h5>kubernetes.flink.log.dir</h5></td>
- <td style="word-wrap: break-word;">"/opt/flink/log"</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>The directory that logs of jobmanager and taskmanager be saved in the pod.</td>
+ <td>The directory that logs of jobmanager and taskmanager be saved in the pod. The default value is $FLINK_HOME/log.</td>
</tr>
<tr>
<td><h5>kubernetes.hadoop.conf.config-map.name</h5></td>
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index b905c8c..a5f61d5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -265,9 +265,10 @@ public class KubernetesConfigOptions {
public static final ConfigOption<String> FLINK_LOG_DIR =
key("kubernetes.flink.log.dir")
.stringType()
- .defaultValue("/opt/flink/log")
+ .noDefaultValue()
.withDescription(
- "The directory that logs of jobmanager and taskmanager be saved in the pod.");
+ "The directory that logs of jobmanager and taskmanager be saved in the pod. "
+ + "The default value is $FLINK_HOME/log.");
public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP =
key("kubernetes.hadoop.conf.config-map.name")
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
index cd74fb3..5b3d896 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
@@ -39,6 +39,7 @@ import io.fabric8.kubernetes.api.model.ResourceRequirements;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
@@ -149,6 +150,7 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
.withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH)
.build())
.endEnv();
+ getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
return mainContainerBuilder.build();
}
@@ -178,4 +180,10 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
.build())
.collect(Collectors.toList());
}
+
+ private Optional<EnvVar> getFlinkLogDirEnv() {
+ return kubernetesJobManagerParameters
+ .getFlinkLogDirInPod()
+ .map(logDir -> new EnvVar(Constants.ENV_FLINK_LOG_DIR, logDir, null));
+ }
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index ddc1745..69668de 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -34,6 +34,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -144,6 +145,7 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
.build())
.addAllToEnv(getCustomizedEnvs());
+ getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
return mainContainerBuilder.build();
}
@@ -153,4 +155,10 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
.map(kv -> new EnvVar(kv.getKey(), kv.getValue(), null))
.collect(Collectors.toList());
}
+
+ private Optional<EnvVar> getFlinkLogDirEnv() {
+ return kubernetesTaskManagerParameters
+ .getFlinkLogDirInPod()
+ .map(logDir -> new EnvVar(Constants.ENV_FLINK_LOG_DIR, logDir, null));
+ }
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index 17d0177..2b51c6d 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -131,8 +131,8 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
}
@Override
- public String getFlinkLogDirInPod() {
- return flinkConfig.getString(KubernetesConfigOptions.FLINK_LOG_DIR);
+ public Optional<String> getFlinkLogDirInPod() {
+ return Optional.ofNullable(flinkConfig.getString(KubernetesConfigOptions.FLINK_LOG_DIR));
}
@Override
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 47f57c5..c6b3354 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -82,7 +82,7 @@ public interface KubernetesParameters {
String getFlinkConfDirInPod();
/** Directory in Pod that saves the log files. */
- String getFlinkLogDirInPod();
+ Optional<String> getFlinkLogDirInPod();
/** The docker entrypoint that starts processes in the container. */
String getContainerEntrypoint();
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index 0f1482f..aa3bfa5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -27,6 +27,7 @@ public class Constants {
public static final String CONFIG_FILE_LOGBACK_NAME = "logback-console.xml";
public static final String CONFIG_FILE_LOG4J_NAME = "log4j-console.properties";
+ public static final String ENV_FLINK_LOG_DIR = "FLINK_LOG_DIR";
public static final String MAIN_CONTAINER_NAME = "flink-main-container";
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
index 364765d..e21e582 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
@@ -67,6 +67,8 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
new Toleration("NoSchedule", "key1", "Equal", null, "value1"),
new Toleration("NoExecute", "key2", "Exists", 6000L, null));
+ private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
+
private Pod resultPod;
private Container resultMainContainer;
@@ -81,6 +83,7 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(
KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
+ this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
}
@Override
@@ -210,4 +213,17 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
this.resultPod.getSpec().getTolerations(),
Matchers.containsInAnyOrder(TOLERATION.toArray()));
}
+
+ @Test
+ public void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
+ final List<EnvVar> envVars = this.resultMainContainer.getEnv();
+ assertThat(
+ envVars.stream()
+ .anyMatch(
+ envVar ->
+ envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR)
+ && envVar.getValue()
+ .equals(USER_DEFINED_FLINK_LOG_DIR)),
+ is(true));
+ }
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index 8e2c38f..546e343 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -72,6 +72,8 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
private static final Long RESOURCE_AMOUNT = 2L;
private static final String RESOURCE_CONFIG_KEY = "test.com/test";
+ private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
+
private Pod resultPod;
private Container resultMainContainer;
@@ -98,6 +100,7 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
RESOURCE_NAME,
KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX),
RESOURCE_CONFIG_KEY);
+ this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
}
@Override
@@ -177,8 +180,7 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
final Map<String, String> resultEnvVars =
this.resultMainContainer.getEnv().stream()
.collect(Collectors.toMap(EnvVar::getName, EnvVar::getValue));
-
- assertEquals(expectedEnvVars, resultEnvVars);
+ expectedEnvVars.forEach((k, v) -> assertThat(resultEnvVars.get(k), is(v)));
}
@Test
@@ -235,4 +237,17 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
this.resultPod.getSpec().getTolerations(),
Matchers.containsInAnyOrder(TOLERATION.toArray()));
}
+
+ @Test
+ public void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
+ final List<EnvVar> envVars = this.resultMainContainer.getEnv();
+ assertThat(
+ envVars.stream()
+ .anyMatch(
+ envVar ->
+ envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR)
+ && envVar.getValue()
+ .equals(USER_DEFINED_FLINK_LOG_DIR)),
+ is(true));
+ }
}