You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/03 08:13:35 UTC
[incubator-streampark] branch dev updated: [Improve] flink job appname Improvement (#1954)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new cc71e161b [Improve] flink job appname Improvement (#1954)
cc71e161b is described below
commit cc71e161bcd0c9808d17130318d3c1eda8ab8d17
Author: benjobs <be...@apache.org>
AuthorDate: Thu Nov 3 16:13:28 2022 +0800
[Improve] flink job appname Improvement (#1954)
---
.../flink/core/scala/FlinkStreaming.scala | 7 +---
.../streampark/flink/core/EnhancerImplicit.scala | 46 ++++++++++++++++++++++
.../flink/core/FlinkStreamTableTrait.scala | 11 +-----
.../flink/core/FlinkTableInitializer.scala | 7 +---
.../streampark/flink/core/FlinkTableTrait.scala | 8 ++--
.../streampark-flink-shims_flink-1.15/pom.xml | 4 +-
.../streampark-flink-shims_flink-1.16/pom.xml | 4 +-
.../flink/submit/trait/FlinkSubmitTrait.scala | 4 +-
8 files changed, 59 insertions(+), 32 deletions(-)
diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
index 86ea0e40f..2660cae84 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.core.scala
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.streampark.flink.core.{FlinkStreamingInitializer, StreamEnvConfig}
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -46,11 +47,7 @@ class StreamingContext(val parameter: ParameterTool, private val environment: St
def start(): JobExecutionResult = execute()
@deprecated override def execute(): JobExecutionResult = {
- val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
- case (appName: String, _) => appName
- case (null, appName: String) => appName
- case _ => ""
- }
+ val appName = parameter.getAppName(required = true)
execute(appName)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
new file mode 100644
index 000000000..4e50516d5
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_APP_NAME}
+import org.apache.streampark.common.util.DeflaterUtils
+
+import scala.util.Try
+
+
+object EnhancerImplicit {
+
+ implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
+
+ def getAppName(name: String = null, required: Boolean = false): String = {
+ val appName = name match {
+ case null =>
+ Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(), null)))
+ .getOrElse(parameterTool.get(KEY_FLINK_APP_NAME, null))
+ case x => x
+ }
+ if (required) {
+ require(appName != null, "[StreamPark] Application name cannot be null")
+ }
+ appName
+ }
+
+ }
+
+}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index 54d86e129..36f557f19 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.flink.core
import com.esotericsoftware.kryo.Serializer
import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.flink.api.common.cache.DistributedCache
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
@@ -67,15 +68,7 @@ abstract class FlinkStreamTableTrait(val parameter: ParameterTool,
* Recommended to use this Api to start tasks
*/
def start(name: String = null): JobExecutionResult = {
- val appName = name match {
- case null =>
- (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
- case (appName: String, _) => appName
- case (null, appName: String) => appName
- case _ => ""
- }
- case x => x
- }
+ val appName = parameter.getAppName(name, true)
execute(appName)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 48b90a149..01574bcf4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -27,6 +27,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.streampark.flink.core.conf.FlinkConfiguration
+import org.apache.streampark.flink.core.EnhancerImplicit._
import java.io.File
import scala.collection.JavaConversions._
@@ -246,11 +247,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
}
localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, setting)
}
- val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
- case (appName: String, _) => appName
- case (null, appName: String) => appName
- case _ => null
- }
+ val appName = parameter.getAppName()
if (appName != null) {
tableMode match {
case TableMode.batch => localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 34eb19524..92d4b96b4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api._
@@ -26,6 +27,7 @@ import org.apache.flink.table.functions._
import org.apache.flink.table.module.Module
import org.apache.flink.table.types.AbstractDataType
+
import java.lang
import java.util.Optional
@@ -33,11 +35,7 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
private val tableEnv: TableEnvironment) extends TableEnvironment {
def start(): JobExecutionResult = {
- val appName = (parameter.get(KEY_APP_NAME(), null), parameter.get(KEY_FLINK_APP_NAME, null)) match {
- case (appName: String, _) => appName
- case (null, appName: String) => appName
- case _ => ""
- }
+ val appName = parameter.getAppName(required = true)
execute(appName)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
index 263806d8a..6ae2e9393 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/pom.xml
@@ -130,9 +130,7 @@
<filter>
<artifact>*:*</artifact>
<excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/*</exclude>
</excludes>
</filter>
</filters>
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
index 0b75ffeb6..5b894d6fe 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/pom.xml
@@ -134,9 +134,7 @@
<filter>
<artifact>*:*</artifact>
<excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/*</exclude>
</excludes>
</filter>
</filters>
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 1c7369f0c..a46d3006c 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -33,7 +33,7 @@ import org.apache.flink.util.Preconditions.checkNotNull
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.{ConfigConst, Workspace}
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, ResolveOrder}
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger, SystemPropertyUtils, Utils}
import org.apache.streampark.flink.core.conf.FlinkRunOption
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.submit.bean._
@@ -413,7 +413,7 @@ trait FlinkSubmitTrait extends Logger {
programArgs += PARAM_KEY_FLINK_CONF
programArgs += submitRequest.flinkYaml
programArgs += PARAM_KEY_APP_NAME
- programArgs += submitRequest.effectiveAppName
+ programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
programArgs += PARAM_KEY_FLINK_PARALLELISM
programArgs += getParallelism(submitRequest).toString
submitRequest.developmentMode match {