You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/18 15:05:38 UTC

[incubator-streampark] branch dev updated: [improve] on k8s mode kubernetes.container.image.pull-policy improvement (#2057)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a1ec26155 [improve] on k8s mode kubernetes.container.image.pull-policy improvement (#2057)
a1ec26155 is described below

commit a1ec2615534ca29fe484ffd70d1cf868fdc0b406
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 18 23:05:32 2022 +0800

    [improve] on k8s mode kubernetes.container.image.pull-policy improvement (#2057)
---
 .../streampark/common/conf/ConfigConst.scala       |  2 +
 .../console/core/entity/Application.java           | 23 +++-------
 .../core/service/impl/ApplicationServiceImpl.java  | 50 ++++++++--------------
 .../flink/submit/bean/CancelRequest.scala          |  3 +-
 .../flink/submit/bean/SubmitRequest.scala          |  2 +-
 .../flink/submit/impl/RemoteSubmit.scala           | 14 +-----
 .../flink/submit/impl/YarnSessionSubmit.scala      |  3 +-
 7 files changed, 31 insertions(+), 66 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 8657ebb1e..a10d777b5 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -126,6 +126,8 @@ object ConfigConst {
 
   val KEY_YARN_APP_QUEUE = "yarn.application.queue"
 
+  val KEY_K8S_IMAGE_PULL_POLICY = "kubernetes.container.image.pull-policy"
+
   // ---table---
   val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4883ccf03..8be656d51 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -654,21 +654,7 @@ public class Application implements Serializable {
 
     @SneakyThrows
     public void doSetHotParams() {
-        Map<String, String> hotParams = new HashMap<>();
-        ExecutionMode executionModeEnum = this.getExecutionModeEnum();
-        if (ExecutionMode.YARN_APPLICATION.equals(executionModeEnum)) {
-            if (StringUtils.isNotEmpty(this.getYarnQueue())) {
-                hotParams.put(ConfigConst.KEY_YARN_APP_QUEUE(), this.getYarnQueue());
-            }
-        }
-        if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
-            if (StringUtils.isNotEmpty(this.getYarnSessionClusterId())) {
-                hotParams.put("yarn.application.id", this.getYarnSessionClusterId());
-            }
-        }
-        if (!hotParams.isEmpty()) {
-            this.setHotParams(JacksonUtils.write(hotParams));
-        }
+        updateHotParams(this);
     }
 
     @SneakyThrows
@@ -679,13 +665,14 @@ public class Application implements Serializable {
             if (StringUtils.isNotEmpty(appParam.getYarnQueue())) {
                 hotParams.put(ConfigConst.KEY_YARN_APP_QUEUE(), appParam.getYarnQueue());
             }
-        }
-        if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
+        } else if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
             if (StringUtils.isNotEmpty(appParam.getYarnSessionClusterId())) {
                 hotParams.put(ConfigConst.KEY_YARN_APP_ID(), appParam.getYarnSessionClusterId());
             }
         }
-        this.setHotParams(JacksonUtils.write(hotParams));
+        if (!hotParams.isEmpty()) {
+            this.setHotParams(JacksonUtils.write(hotParams));
+        }
     }
 
     @Data
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 7df4980f9..6eccd405c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -991,16 +991,12 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
         }
 
-        if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
-            if (!application.getHotParamsMap().isEmpty()) {
+        if (!application.getHotParamsMap().isEmpty()) {
+            if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
                 if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
                     application.setYarnQueue(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()).toString());
                 }
-            }
-        }
-
-        if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-            if (!application.getHotParamsMap().isEmpty()) {
+            } else if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
                 if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_ID())) {
                     application.setYarnSessionClusterId(application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID()).toString());
                 }
@@ -1065,9 +1061,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
         }
 
-        Map<String, Object> extraParameter = new HashMap<>(0);
-
-        Map<String, Object> optionMap = application.getOptionMap();
+        Map<String, Object> optionMap = new HashMap<>();
 
         if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
             FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
@@ -1076,21 +1070,15 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                         "the cluster has been deleted. Please contact the Admin.",
                     application.getFlinkClusterId()));
             URI activeAddress = cluster.getActiveAddress();
-            extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
-            extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
-        }
-
-        if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
-            String yarnQueue = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
-            if (yarnQueue != null) {
-                optionMap.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
-            }
+            optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+            optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
+        } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
             if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
                 FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
                 AssertUtils.state(cluster != null,
                     String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
                         "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
-                extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
+                optionMap.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
             }
         }
 
@@ -1108,8 +1096,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             appParam.getDrain(),
             customSavepoint,
             application.getK8sNamespace(),
-            application.getDynamicProperties(),
-            extraParameter
+            optionMap
         );
 
         CompletableFuture<CancelResponse> cancelFuture =
@@ -1305,10 +1292,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         Map<String, String> properties = FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
 
-        Map<String, Object> extraParameter = new HashMap<>(0);
+        Map<String, Object> optionMap = application.getOptionMap();
 
         if (appParam.getAllowNonRestored()) {
-            extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
+            optionMap.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
         }
 
         if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
@@ -1317,11 +1304,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                 String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
                     "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
             URI activeAddress = cluster.getActiveAddress();
-            extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
-            extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
-        }
-
-        if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
+            optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+            optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
+        } else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
             String yarnQueue = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
             if (yarnQueue != null) {
                 properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
@@ -1331,10 +1316,13 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                 AssertUtils.state(cluster != null,
                     String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
                         "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
-                extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
+                optionMap.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
             }
+        } else if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+            optionMap.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
         }
 
+        Map<String, Object> extraParameter = new HashMap<>(0);
         if (application.isFlinkSqlJob()) {
             FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
             // Get the sql of the replaced placeholder
@@ -1406,7 +1394,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             application.getApplicationType(),
             getSavePointed(appParam),
             appParam.getFlameGraph() ? getFlameGraph(application) : null,
-            application.getOptionMap(),
+            optionMap,
             properties,
             applicationArgs,
             buildResult,
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
index 162743a72..902d3d137 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
@@ -32,8 +32,7 @@ case class CancelRequest(flinkVersion: FlinkVersion,
                          withDrain: Boolean,
                          customSavePointPath: String,
                          kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
-                         @Nullable properties: String,
-                         @Nullable extraParameter: JavaMap[String, Any]
+                         @Nullable option: JavaMap[String, Any]
                       ) {
 
 }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index 601fc8ade..f46cc6782 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -74,7 +74,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
+  lazy val allowNonRestoredState = Try(option.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
 
   lazy val savepointRestoreSettings: SavepointRestoreSettings = {
     savePoint match {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
index 964ee479f..5d4aab09c 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
@@ -41,16 +41,6 @@ object RemoteSubmit extends FlinkSubmitTrait {
    * @param flinkConfig
    */
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
-    flinkConfig
-      .safeSet(RestOptions.ADDRESS, submitRequest.extraParameter.get(RestOptions.ADDRESS.key()).toString)
-      .safeSet[JavaInt](RestOptions.PORT, submitRequest.extraParameter.get(RestOptions.PORT.key()).toString.toInt)
-
-    logInfo(
-      s"""
-         |------------------------------------------------------------------
-         |Effective submit configuration: $flinkConfig
-         |------------------------------------------------------------------
-         |""".stripMargin)
   }
 
   override def doSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = {
@@ -62,8 +52,8 @@ object RemoteSubmit extends FlinkSubmitTrait {
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, cancelRequest.executionMode.getName)
-      .safeSet(RestOptions.ADDRESS, cancelRequest.extraParameter.get(RestOptions.ADDRESS.key()).toString)
-      .safeSet[JavaInt](RestOptions.PORT, cancelRequest.extraParameter.get(RestOptions.PORT.key()).toString.toInt)
+      .safeSet(RestOptions.ADDRESS, cancelRequest.option.get(RestOptions.ADDRESS.key()).toString)
+      .safeSet[JavaInt](RestOptions.PORT, cancelRequest.option.get(RestOptions.PORT.key()).toString.toInt)
     logInfo(
       s"""
          |------------------------------------------------------------------
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 1231d02e3..610990dc0 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -49,7 +49,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
-      .safeSet(YarnConfigOptions.APPLICATION_ID, submitRequest.extraParameter.get(KEY_YARN_APP_ID).toString)
 
     logInfo(
       s"""
@@ -138,7 +137,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
   }
 
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.extraParameter.get(KEY_YARN_APP_ID).toString)
+    flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, cancelRequest.option.get(KEY_YARN_APP_ID).toString)
     flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
     logInfo(
       s"""