You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/11/05 14:08:03 UTC
(incubator-streampark) branch dev updated: [Improve] submit flink job userclassPaths improvement (#3313)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 5b727cc26 [Improve] submit flink job userclassPaths improvement (#3313)
5b727cc26 is described below
commit 5b727cc26dd296ff16072359b8fb52e3f3f48b36
Author: benjobs <be...@apache.org>
AuthorDate: Sun Nov 5 22:07:57 2023 +0800
[Improve] submit flink job userclassPaths improvement (#3313)
* [Improve] submit flink job userclassPaths improvement
* userClasspath minor improvement
---
.../flink/client/bean/SubmitRequest.scala | 14 +-
.../scala/org/apache/commons/cli/CommandLine.java | 197 ---------------------
.../flink/client/impl/YarnApplicationClient.scala | 4 -
.../flink/client/trait/FlinkClientTrait.scala | 24 +--
.../streampark/flink/core/FlinkSqlValidator.scala | 2 +-
5 files changed, 15 insertions(+), 226 deletions(-)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 25e17120d..2c1a72e51 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.client.bean
import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace}
+import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums._
import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils}
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointResto
import javax.annotation.Nullable
import java.io.File
+import java.net.URL
import java.util.{Map => JavaMap}
import scala.collection.convert.ImplicitConversions._
@@ -55,7 +56,8 @@ case class SubmitRequest(
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
- lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
+ private[this] lazy val appProperties: Map[String, String] = getParameterMap(
+ KEY_FLINK_PROPERTY_PREFIX)
lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX)
@@ -68,9 +70,15 @@ case class SubmitRequest(
lazy val effectiveAppName: String =
if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName
+ lazy val classPaths: List[URL] = {
+ val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
+ val lib = Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
+ flinkVersion.flinkLibs ++ lib
+ }
+
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
- lazy val allowNonRestoredState = Try(
+ lazy val allowNonRestoredState: Boolean = Try(
properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean)
.getOrElse(false)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java
deleted file mode 100644
index f1380fd9a..000000000
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.commons.cli;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-public class CommandLine implements Serializable {
-
- /** The serial version UID. */
- private static final long serialVersionUID = 1L;
-
- /** The unrecognized options/arguments */
- private final List<String> args = new LinkedList<>();
-
- /** The processed options */
- private final List<Option> options = new ArrayList<>();
-
- /**
- * Creates a command line.
- */
- protected CommandLine() {
- // nothing to do
- }
-
- /**
- * Retrieve the map of values associated to the option. This is convenient for options specifying Java properties like
- * <code>-Dparam1=value1
- * -Dparam2=value2</code>. The first argument of the option is the key, and the 2nd argument is the value. If the option
- * has only one argument ({@code -Dfoo}) it is considered as a boolean flag and the value is {@code "true"}.
- *
- * @param option name of the option.
- * @return The Properties mapped by the option, never {@code null} even if the option doesn't exists.
- * @since 1.5.0
- */
- public Properties getOptionProperties(final Option option) {
- final Properties props = new Properties();
-
- for (final Option processedOption : options) {
- if (processedOption.equals(option)) {
- final List<String> values = processedOption.getValuesList();
- if (values.size() >= 2) {
- // use the first 2 arguments as the key/value pair
- props.put(values.get(0), values.get(1));
- } else if (values.size() == 1) {
- // no explicit value, handle it as a boolean
- props.put(values.get(0), "true");
- }
- }
- }
-
- return props;
- }
-
- /**
- * Retrieve the first argument, if any, of this option.
- *
- * @param option the name of the option.
- * @return Value of the argument if option is set, and has an argument, otherwise null.
- * @since 1.5.0
- */
- public String getOptionValue(final Option option) {
- if (option == null) {
- return null;
- }
- final String[] values = getOptionValues(option);
- return values == null ? null : values[0];
- }
-
- /**
- * Retrieve the first argument, if any, of an option.
- *
- * @param option name of the option.
- * @param defaultValue is the default value to be returned if the option is not specified.
- * @return Value of the argument if option is set, and has an argument, otherwise {@code defaultValue}.
- * @since 1.5.0
- */
- public String getOptionValue(final Option option, final String defaultValue) {
- final String answer = getOptionValue(option);
- return answer != null ? answer : defaultValue;
- }
-
- /**
- * Retrieves the array of values, if any, of an option.
- *
- * @param option string name of the option.
- * @return Values of the argument if option is set, and has an argument, otherwise null.
- * @since 1.5.0
- */
- public String[] getOptionValues(final Option option) {
- final List<String> values = new ArrayList<>();
-
- for (final Option processedOption : options) {
- if (processedOption.equals(option)) {
- values.addAll(processedOption.getValuesList());
- }
- }
-
- return values.isEmpty() ? null : values.toArray(new String[values.size()]);
- }
-
- /**
- * Return a version of this {@code Option} converted to a particular type.
- *
- * @param opt the name of the option.
- * @return the value parsed into a particular object.
- * @throws ParseException if there are problems turning the option value into the desired type
- * @see PatternOptionBuilder
- * @since 1.5.0
- */
- public Object getParsedOptionValue(final char opt) throws ParseException {
- return getParsedOptionValue(String.valueOf(opt));
- }
-
- /**
- * Return a version of this {@code Option} converted to a particular type.
- *
- * @param option the name of the option.
- * @return the value parsed into a particular object.
- * @throws ParseException if there are problems turning the option value into the desired type
- * @see PatternOptionBuilder
- * @since 1.5.0
- */
- public Object getParsedOptionValue(final Option option) throws ParseException {
- if (option == null) {
- return null;
- }
- final String res = getOptionValue(option);
- if (res == null) {
- return null;
- }
- return TypeHandler.createValue(res, option.getType());
- }
-
-
- /**
- * Tests to see if an option has been set.
- *
- * @param opt the option to check.
- * @return true if set, false if not.
- * @since 1.5.0
- */
- public boolean hasOption(final Option opt) {
- return options.contains(opt);
- }
-
-
- /**
- * Return a version of this {@code Option} converted to a particular type.
- *
- * @param opt the name of the option.
- * @return the value parsed into a particular object.
- * @throws ParseException if there are problems turning the option value into the desired type
- * @see PatternOptionBuilder
- * @since 1.2
- */
- public Object getParsedOptionValue(final String opt) throws ParseException {
- return getParsedOptionValue(resolveOption(opt));
- }
-
-
- /**
- * Retrieves the option object given the long or short option as a String
- *
- * @param opt short or long name of the option.
- * @return Canonicalized option.
- */
- private Option resolveOption(String opt) {
- opt = Util.stripLeadingHyphens(opt);
- for (final Option option : options) {
- if (opt.equals(option.getOpt()) || opt.equals(option.getLongOpt())) {
- return option;
- }
-
- }
- return null;
- }
-
-}
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f7b38cdde..3470e8e67 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -101,10 +101,6 @@ object YarnApplicationClient extends YarnClientTrait {
if (!FsOperator.hdfs.exists(pyVenv)) {
throw new RuntimeException(s"$pyVenv File does not exist")
}
-
- // including $app/lib
- includingPipelineJars(submitRequest, flinkConfig)
-
// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 435d882c7..020301b73 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
+import org.apache.streampark.common.enums._
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util._
import org.apache.streampark.flink.client.bean._
@@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
+import java.net.URL
import java.util
import java.util.{Collections, List => JavaList, Map => JavaMap}
@@ -256,9 +257,7 @@ trait FlinkClientTrait extends Logger {
flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
val pkgBuilder = PackagedProgram.newBuilder
- .setUserClassPaths(
- Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
- )
+ .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
)
@@ -275,14 +274,6 @@ trait FlinkClientTrait extends Logger {
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
- // including $app/lib
- val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
- val localLibUrl = new File(localLib).listFiles().map(_.toURI.toURL).toList
- pkgBuilder.setUserClassPaths(
- Lists.newArrayList(localLibUrl: _*)
- )
- }
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -600,13 +591,4 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath, savepointRequest.nativeFormat).get()
}
- private[client] def includingPipelineJars(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration) = {
- val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
- flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
- }
- }
-
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 4966cbe5e..f565392c7 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -73,7 +73,7 @@ object FlinkSqlValidator extends Logger {
var hasInsert = false
for (call <- sqlCommands) {
val args = call.operands.head
- lazy val command = call.command
+ val command = call.command
command match {
case SET | RESET =>
if (command == SET && args == TableConfigOptions.TABLE_SQL_DIALECT.key()) {