You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/11/05 14:08:03 UTC

(incubator-streampark) branch dev updated: [Improve] submit flink job userclassPaths improvement (#3313)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5b727cc26 [Improve] submit flink job userclassPaths improvement (#3313)
5b727cc26 is described below

commit 5b727cc26dd296ff16072359b8fb52e3f3f48b36
Author: benjobs <be...@apache.org>
AuthorDate: Sun Nov 5 22:07:57 2023 +0800

    [Improve] submit flink job userclassPaths improvement (#3313)
    
    * [Improve] submit flink job userclassPaths improvement
    
    * userClasspath minor improvement
---
 .../flink/client/bean/SubmitRequest.scala          |  14 +-
 .../scala/org/apache/commons/cli/CommandLine.java  | 197 ---------------------
 .../flink/client/impl/YarnApplicationClient.scala  |   4 -
 .../flink/client/trait/FlinkClientTrait.scala      |  24 +--
 .../streampark/flink/core/FlinkSqlValidator.scala  |   2 +-
 5 files changed, 15 insertions(+), 226 deletions(-)

diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 25e17120d..2c1a72e51 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.client.bean
 
 import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace}
+import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
 import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums._
 import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils}
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointResto
 import javax.annotation.Nullable
 
 import java.io.File
+import java.net.URL
 import java.util.{Map => JavaMap}
 
 import scala.collection.convert.ImplicitConversions._
@@ -55,7 +56,8 @@ case class SubmitRequest(
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
 
-  lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
+  private[this] lazy val appProperties: Map[String, String] = getParameterMap(
+    KEY_FLINK_PROPERTY_PREFIX)
 
   lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX)
 
@@ -68,9 +70,15 @@ case class SubmitRequest(
   lazy val effectiveAppName: String =
     if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName
 
+  lazy val classPaths: List[URL] = {
+    val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
+    val lib = Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL])
+    flinkVersion.flinkLibs ++ lib
+  }
+
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val allowNonRestoredState = Try(
+  lazy val allowNonRestoredState: Boolean = Try(
     properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean)
     .getOrElse(false)
 
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java
deleted file mode 100644
index f1380fd9a..000000000
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.commons.cli;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-public class CommandLine implements Serializable {
-
-    /** The serial version UID. */
-    private static final long serialVersionUID = 1L;
-
-    /** The unrecognized options/arguments */
-    private final List<String> args = new LinkedList<>();
-
-    /** The processed options */
-    private final List<Option> options = new ArrayList<>();
-
-    /**
-     * Creates a command line.
-     */
-    protected CommandLine() {
-        // nothing to do
-    }
-
-    /**
-     * Retrieve the map of values associated to the option. This is convenient for options specifying Java properties like
-     * <code>-Dparam1=value1
-     * -Dparam2=value2</code>. The first argument of the option is the key, and the 2nd argument is the value. If the option
-     * has only one argument ({@code -Dfoo}) it is considered as a boolean flag and the value is {@code "true"}.
-     *
-     * @param option name of the option.
-     * @return The Properties mapped by the option, never {@code null} even if the option doesn't exists.
-     * @since 1.5.0
-     */
-    public Properties getOptionProperties(final Option option) {
-        final Properties props = new Properties();
-
-        for (final Option processedOption : options) {
-            if (processedOption.equals(option)) {
-                final List<String> values = processedOption.getValuesList();
-                if (values.size() >= 2) {
-                    // use the first 2 arguments as the key/value pair
-                    props.put(values.get(0), values.get(1));
-                } else if (values.size() == 1) {
-                    // no explicit value, handle it as a boolean
-                    props.put(values.get(0), "true");
-                }
-            }
-        }
-
-        return props;
-    }
-
-    /**
-     * Retrieve the first argument, if any, of this option.
-     *
-     * @param option the name of the option.
-     * @return Value of the argument if option is set, and has an argument, otherwise null.
-     * @since 1.5.0
-     */
-    public String getOptionValue(final Option option) {
-        if (option == null) {
-            return null;
-        }
-        final String[] values = getOptionValues(option);
-        return values == null ? null : values[0];
-    }
-
-    /**
-     * Retrieve the first argument, if any, of an option.
-     *
-     * @param option name of the option.
-     * @param defaultValue is the default value to be returned if the option is not specified.
-     * @return Value of the argument if option is set, and has an argument, otherwise {@code defaultValue}.
-     * @since 1.5.0
-     */
-    public String getOptionValue(final Option option, final String defaultValue) {
-        final String answer = getOptionValue(option);
-        return answer != null ? answer : defaultValue;
-    }
-
-    /**
-     * Retrieves the array of values, if any, of an option.
-     *
-     * @param option string name of the option.
-     * @return Values of the argument if option is set, and has an argument, otherwise null.
-     * @since 1.5.0
-     */
-    public String[] getOptionValues(final Option option) {
-        final List<String> values = new ArrayList<>();
-
-        for (final Option processedOption : options) {
-            if (processedOption.equals(option)) {
-                values.addAll(processedOption.getValuesList());
-            }
-        }
-
-        return values.isEmpty() ? null : values.toArray(new String[values.size()]);
-    }
-
-    /**
-     * Return a version of this {@code Option} converted to a particular type.
-     *
-     * @param opt the name of the option.
-     * @return the value parsed into a particular object.
-     * @throws ParseException if there are problems turning the option value into the desired type
-     * @see PatternOptionBuilder
-     * @since 1.5.0
-     */
-    public Object getParsedOptionValue(final char opt) throws ParseException {
-        return getParsedOptionValue(String.valueOf(opt));
-    }
-
-    /**
-     * Return a version of this {@code Option} converted to a particular type.
-     *
-     * @param option the name of the option.
-     * @return the value parsed into a particular object.
-     * @throws ParseException if there are problems turning the option value into the desired type
-     * @see PatternOptionBuilder
-     * @since 1.5.0
-     */
-    public Object getParsedOptionValue(final Option option) throws ParseException {
-        if (option == null) {
-            return null;
-        }
-        final String res = getOptionValue(option);
-        if (res == null) {
-            return null;
-        }
-        return TypeHandler.createValue(res, option.getType());
-    }
-
-
-    /**
-     * Tests to see if an option has been set.
-     *
-     * @param opt the option to check.
-     * @return true if set, false if not.
-     * @since 1.5.0
-     */
-    public boolean hasOption(final Option opt) {
-        return options.contains(opt);
-    }
-
-
-    /**
-     * Return a version of this {@code Option} converted to a particular type.
-     *
-     * @param opt the name of the option.
-     * @return the value parsed into a particular object.
-     * @throws ParseException if there are problems turning the option value into the desired type
-     * @see PatternOptionBuilder
-     * @since 1.2
-     */
-    public Object getParsedOptionValue(final String opt) throws ParseException {
-        return getParsedOptionValue(resolveOption(opt));
-    }
-
-
-    /**
-     * Retrieves the option object given the long or short option as a String
-     *
-     * @param opt short or long name of the option.
-     * @return Canonicalized option.
-     */
-    private Option resolveOption(String opt) {
-        opt = Util.stripLeadingHyphens(opt);
-        for (final Option option : options) {
-            if (opt.equals(option.getOpt()) || opt.equals(option.getLongOpt())) {
-                return option;
-            }
-
-        }
-        return null;
-    }
-
-}
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f7b38cdde..3470e8e67 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -101,10 +101,6 @@ object YarnApplicationClient extends YarnClientTrait {
       if (!FsOperator.hdfs.exists(pyVenv)) {
         throw new RuntimeException(s"$pyVenv File does not exist")
       }
-
-      // including $app/lib
-      includingPipelineJars(submitRequest, flinkConfig)
-
       // yarn.ship-files
       val shipFiles = new util.ArrayList[String]()
       shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 435d882c7..020301b73 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
 import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
+import org.apache.streampark.common.enums._
 import org.apache.streampark.common.fs.FsOperator
 import org.apache.streampark.common.util._
 import org.apache.streampark.flink.client.bean._
@@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
 import java.io.File
+import java.net.URL
 import java.util
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
@@ -256,9 +257,7 @@ trait FlinkClientTrait extends Logger {
       flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
 
     val pkgBuilder = PackagedProgram.newBuilder
-      .setUserClassPaths(
-        Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
-      )
+      .setUserClassPaths(Lists.newArrayList(submitRequest.classPaths: _*))
       .setEntryPointClassName(
         flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
       )
@@ -275,14 +274,6 @@ trait FlinkClientTrait extends Logger {
         if (!FsOperator.lfs.exists(pythonVenv)) {
           throw new RuntimeException(s"$pythonVenv File does not exist")
         }
-        // including $app/lib
-        val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-        if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
-          val localLibUrl = new File(localLib).listFiles().map(_.toURI.toURL).toList
-          pkgBuilder.setUserClassPaths(
-            Lists.newArrayList(localLibUrl: _*)
-          )
-        }
         flinkConfig
           // python.archives
           .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -600,13 +591,4 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath, savepointRequest.nativeFormat).get()
   }
 
-  private[client] def includingPipelineJars(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration) = {
-    val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-    if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
-      flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
-    }
-  }
-
 }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 4966cbe5e..f565392c7 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -73,7 +73,7 @@ object FlinkSqlValidator extends Logger {
     var hasInsert = false
     for (call <- sqlCommands) {
       val args = call.operands.head
-      lazy val command = call.command
+      val command = call.command
       command match {
         case SET | RESET =>
           if (command == SET && args == TableConfigOptions.TABLE_SQL_DIALECT.key()) {