You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/03 08:13:35 UTC

[incubator-streampark] branch dev updated: [Improve] flink job appname Improvement (#1954)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new cc71e161b [Improve] flink job appname Improvement (#1954)
cc71e161b is described below

commit cc71e161bcd0c9808d17130318d3c1eda8ab8d17
Author: benjobs <be...@apache.org>
AuthorDate: Thu Nov 3 16:13:28 2022 +0800

    [Improve] flink job appname Improvement (#1954)
---
 .../flink/core/scala/FlinkStreaming.scala          |  7 +---
 .../streampark/flink/core/EnhancerImplicit.scala   | 46 ++++++++++++++++++++++
 .../flink/core/FlinkStreamTableTrait.scala         | 11 +-----
 .../flink/core/FlinkTableInitializer.scala         |  7 +---
 .../streampark/flink/core/FlinkTableTrait.scala    |  8 ++--
 .../streampark-flink-shims_flink-1.15/pom.xml      |  4 +-
 .../streampark-flink-shims_flink-1.16/pom.xml      |  4 +-
 .../flink/submit/trait/FlinkSubmitTrait.scala      |  4 +-
 8 files changed, 59 insertions(+), 32 deletions(-)

diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
index 86ea0e40f..2660cae84 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.core.scala
 
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.flink.core.EnhancerImplicit._
 import org.apache.streampark.flink.core.{FlinkStreamingInitializer, StreamEnvConfig}
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -46,11 +47,7 @@ class StreamingContext(val parameter: ParameterTool, private val environment: St
   def start(): JobExecutionResult = execute()
 
   @deprecated override def execute(): JobExecutionResult = {
-    val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
-      case (appName: String, _) => appName
-      case (null, appName: String) => appName
-      case _ => ""
-    }
+    val appName = parameter.getAppName(required = true)
     execute(appName)
   }
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
new file mode 100644
index 000000000..4e50516d5
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_APP_NAME}
+import org.apache.streampark.common.util.DeflaterUtils
+
+import scala.util.Try
+
+
+object EnhancerImplicit {
+
+  implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
+
+    def getAppName(name: String = null, required: Boolean = false): String = {
+      val appName = name match {
+        case null =>
+          Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(), null)))
+            .getOrElse(parameterTool.get(KEY_FLINK_APP_NAME, null))
+        case x => x
+      }
+      if (required) {
+        require(appName != null, "[StreamPark] Application name cannot be null")
+      }
+      appName
+    }
+
+  }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index 54d86e129..36f557f19 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.flink.core
 
 import com.esotericsoftware.kryo.Serializer
 import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.flink.core.EnhancerImplicit._
 import org.apache.flink.api.common.cache.DistributedCache
 import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
@@ -67,15 +68,7 @@ abstract class FlinkStreamTableTrait(val parameter: ParameterTool,
    * Recommended to use this Api to start tasks
    */
   def start(name: String = null): JobExecutionResult = {
-    val appName = name match {
-      case null =>
-        (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
-          case (appName: String, _) => appName
-          case (null, appName: String) => appName
-          case _ => ""
-        }
-      case x => x
-    }
+    val appName = parameter.getAppName(name, true)
     execute(appName)
   }
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 48b90a149..01574bcf4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.streampark.flink.core.conf.FlinkConfiguration
+import org.apache.streampark.flink.core.EnhancerImplicit._
 
 import java.io.File
 import scala.collection.JavaConversions._
@@ -246,11 +247,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
         }
         localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, setting)
     }
-    val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
-      case (appName: String, _) => appName
-      case (null, appName: String) => appName
-      case _ => null
-    }
+    val appName = parameter.getAppName()
     if (appName != null) {
       tableMode match {
         case TableMode.batch => localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 34eb19524..92d4b96b4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -17,6 +17,7 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.flink.core.EnhancerImplicit._
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api._
@@ -26,6 +27,7 @@ import org.apache.flink.table.functions._
 import org.apache.flink.table.module.Module
 import org.apache.flink.table.types.AbstractDataType
 
+
 import java.lang
 import java.util.Optional
 
@@ -33,11 +35,7 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
                                private val tableEnv: TableEnvironment) extends TableEnvironment {
 
   def start(): JobExecutionResult = {
-    val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
-      case (appName: String, _) => appName
-      case (null, appName: String) => appName
-      case _ => ""
-    }
+    val appName = parameter.getAppName(required = true)
     execute(appName)
   }
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
index 263806d8a..6ae2e9393 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
@@ -130,9 +130,7 @@
                                 <filter>
                                     <artifact>*:*</artifact>
                                     <excludes>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>META-INF/*</exclude>
                                     </excludes>
                                 </filter>
                             </filters>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
index 0b75ffeb6..5b894d6fe 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
@@ -134,9 +134,7 @@
                                 <filter>
                                     <artifact>*:*</artifact>
                                     <excludes>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>META-INF/*</exclude>
                                     </excludes>
                                 </filter>
                             </filters>
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 1c7369f0c..a46d3006c 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -33,7 +33,7 @@ import org.apache.flink.util.Preconditions.checkNotNull
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.conf.{ConfigConst, Workspace}
 import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, ResolveOrder}
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger, SystemPropertyUtils, Utils}
 import org.apache.streampark.flink.core.conf.FlinkRunOption
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.submit.bean._
@@ -413,7 +413,7 @@ trait FlinkSubmitTrait extends Logger {
       programArgs += PARAM_KEY_FLINK_CONF
       programArgs += submitRequest.flinkYaml
       programArgs += PARAM_KEY_APP_NAME
-      programArgs += submitRequest.effectiveAppName
+      programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
       programArgs += PARAM_KEY_FLINK_PARALLELISM
       programArgs += getParallelism(submitRequest).toString
       submitRequest.developmentMode match {