You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/18 15:05:38 UTC
[incubator-streampark] branch dev updated: [improve] on k8s mode kubernetes.container.image.pull-policy improvement (#2057)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a1ec26155 [improve] on k8s mode kubernetes.container.image.pull-policy improvement (#2057)
a1ec26155 is described below
commit a1ec2615534ca29fe484ffd70d1cf868fdc0b406
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 18 23:05:32 2022 +0800
[improve] on k8s mode kubernetes.container.image.pull-policy improvement (#2057)
---
.../streampark/common/conf/ConfigConst.scala | 2 +
.../console/core/entity/Application.java | 23 +++-------
.../core/service/impl/ApplicationServiceImpl.java | 50 ++++++++--------------
.../flink/submit/bean/CancelRequest.scala | 3 +-
.../flink/submit/bean/SubmitRequest.scala | 2 +-
.../flink/submit/impl/RemoteSubmit.scala | 14 +-----
.../flink/submit/impl/YarnSessionSubmit.scala | 3 +-
7 files changed, 31 insertions(+), 66 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 8657ebb1e..a10d777b5 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -126,6 +126,8 @@ object ConfigConst {
val KEY_YARN_APP_QUEUE = "yarn.application.queue"
+ val KEY_K8S_IMAGE_PULL_POLICY = "kubernetes.container.image.pull-policy"
+
// ---table---
val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4883ccf03..8be656d51 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -654,21 +654,7 @@ public class Application implements Serializable {
@SneakyThrows
public void doSetHotParams() {
- Map<String, String> hotParams = new HashMap<>();
- ExecutionMode executionModeEnum = this.getExecutionModeEnum();
- if (ExecutionMode.YARN_APPLICATION.equals(executionModeEnum)) {
- if (StringUtils.isNotEmpty(this.getYarnQueue())) {
- hotParams.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.getYarnQueue());
- }
- }
- if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
- if (StringUtils.isNotEmpty(this.getYarnSessionClusterId())) {
- hotParams.put("yarn.application.id", this.getYarnSessionClusterId());
- }
- }
- if (!hotParams.isEmpty()) {
- this.setHotParams(JacksonUtils.write(hotParams));
- }
+ updateHotParams(this);
}
@SneakyThrows
@@ -679,13 +665,14 @@ public class Application implements Serializable {
if (StringUtils.isNotEmpty(appParam.getYarnQueue())) {
hotParams.put(ConfigConst.KEY_YARN_APP_QUEUE(), appParam.getYarnQueue());
}
- }
- if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
+ } else if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
if (StringUtils.isNotEmpty(appParam.getYarnSessionClusterId())) {
hotParams.put(ConfigConst.KEY_YARN_APP_ID(), appParam.getYarnSessionClusterId());
}
}
- this.setHotParams(JacksonUtils.write(hotParams));
+ if (!hotParams.isEmpty()) {
+ this.setHotParams(JacksonUtils.write(hotParams));
+ }
}
@Data
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 7df4980f9..6eccd405c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -991,16 +991,12 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
}
- if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
- if (!application.getHotParamsMap().isEmpty()) {
+ if (!application.getHotParamsMap().isEmpty()) {
+ if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
application.setYarnQueue(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()).toString());
}
- }
- }
-
- if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- if (!application.getHotParamsMap().isEmpty()) {
+ } else if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_ID())) {
application.setYarnSessionClusterId(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID()).toString());
}
@@ -1065,9 +1061,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
}
- Map<String, Object> extraParameter = new HashMap<>(0);
-
- Map<String, Object> optionMap = application.getOptionMap();
+ Map<String, Object> optionMap = new HashMap<>();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
@@ -1076,21 +1070,15 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
"the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getActiveAddress();
- extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
- extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
- }
-
- if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
- String yarnQueue = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
- if (yarnQueue != null) {
- optionMap.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
- }
+ optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+ optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
+ } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(cluster != null,
String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
"the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
- extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
+ optionMap.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
}
}
@@ -1108,8 +1096,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
appParam.getDrain(),
customSavepoint,
application.getK8sNamespace(),
- application.getDynamicProperties(),
- extraParameter
+ optionMap
);
CompletableFuture<CancelResponse> cancelFuture =
@@ -1305,10 +1292,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
Map<String, String> properties = FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
- Map<String, Object> extraParameter = new HashMap<>(0);
+ Map<String, Object> optionMap = application.getOptionMap();
if (appParam.getAllowNonRestored()) {
- extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
+ optionMap.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
}
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
@@ -1317,11 +1304,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
"the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
URI activeAddress = cluster.getActiveAddress();
- extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
- extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
- }
-
- if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
+ optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+ optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
+ } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
String yarnQueue = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
if (yarnQueue != null) {
properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
@@ -1331,10 +1316,13 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
AssertUtils.state(cluster != null,
String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
"the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
- extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
+ optionMap.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
}
+ } else if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+ optionMap.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
}
+ Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
@@ -1406,7 +1394,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
application.getApplicationType(),
getSavePointed(appParam),
appParam.getFlameGraph() ? getFlameGraph(application) : null,
- application.getOptionMap(),
+ optionMap,
properties,
applicationArgs,
buildResult,
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
index 162743a72..902d3d137 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
@@ -32,8 +32,7 @@ case class CancelRequest(flinkVersion: FlinkVersion,
withDrain: Boolean,
customSavePointPath: String,
kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
- @Nullable properties: String,
- @Nullable extraParameter: JavaMap[String, Any]
+ @Nullable option: JavaMap[String, Any]
) {
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index 601fc8ade..f46cc6782 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -74,7 +74,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
- lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
+ lazy val allowNonRestoredState = Try(option.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
lazy val savepointRestoreSettings: SavepointRestoreSettings = {
savePoint match {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
index 964ee479f..5d4aab09c 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
@@ -41,16 +41,6 @@ object RemoteSubmit extends FlinkSubmitTrait {
* @param flinkConfig
*/
override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
- flinkConfig
- .safeSet(RestOptions.ADDRESS, submitRequest.extraParameter.get(RestOptions.ADDRESS.key()).toString)
- .safeSet[JavaInt](RestOptions.PORT, submitRequest.extraParameter.get(RestOptions.PORT.key()).toString.toInt)
-
- logInfo(
- s"""
- |------------------------------------------------------------------
- |Effective submit configuration: $flinkConfig
- |------------------------------------------------------------------
- |""".stripMargin)
}
override def doSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = {
@@ -62,8 +52,8 @@ object RemoteSubmit extends FlinkSubmitTrait {
override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
flinkConfig
.safeSet(DeploymentOptions.TARGET, cancelRequest.executionMode.getName)
- .safeSet(RestOptions.ADDRESS, cancelRequest.extraParameter.get(RestOptions.ADDRESS.key()).toString)
- .safeSet[JavaInt](RestOptions.PORT, cancelRequest.extraParameter.get(RestOptions.PORT.key()).toString.toInt)
+ .safeSet(RestOptions.ADDRESS, cancelRequest.option.get(RestOptions.ADDRESS.key()).toString)
+ .safeSet[JavaInt](RestOptions.PORT, cancelRequest.option.get(RestOptions.PORT.key()).toString.toInt)
logInfo(
s"""
|------------------------------------------------------------------
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 1231d02e3..610990dc0 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -49,7 +49,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
flinkConfig
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
- .safeSet(YarnConfigOptions.APPLICATION_ID, submitRequest.extraParameter.get(KEY_YARN_APP_ID).toString)
logInfo(
s"""
@@ -138,7 +137,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
}
override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
- flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.extraParameter.get(KEY_YARN_APP_ID).toString)
+ flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.option.get(KEY_YARN_APP_ID).toString)
flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
logInfo(
s"""