You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/05/13 01:22:48 UTC

[flink] branch master updated: [FLINK-17612][python][sql-client] Support Python command line options in SQL Client. (#12077)

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 722f844  [FLINK-17612][python][sql-client] Support Python command line options in SQL Client. (#12077)
722f844 is described below

commit 722f8445b7f8f2df90af9e8c4b3cfa231d71129d
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Wed May 13 09:22:29 2020 +0800

    [FLINK-17612][python][sql-client] Support Python command line options in SQL Client. (#12077)
---
 docs/dev/table/sqlClient.md                        | 79 ++++++++++++++++++++++
 docs/dev/table/sqlClient.zh.md                     | 79 ++++++++++++++++++++++
 .../org/apache/flink/table/client/SqlClient.java   | 13 ++++
 .../apache/flink/table/client/cli/CliOptions.java  | 11 ++-
 .../flink/table/client/cli/CliOptionsParser.java   | 43 +++++++++++-
 5 files changed, 221 insertions(+), 4 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index daa1419..8331efe 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -136,6 +136,10 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            properties.
      -h,--help                             Show the help message with
                                            descriptions of all options.
+     -hist,--history <History file path>   The file which you want to save the
+                                           command history into. If not
+                                           specified, we will auto-generate one
+                                           under your user's home directory.
      -j,--jar <JAR file>                   A JAR file to be imported into the
                                            session. The file might contain
                                            user-defined classes needed for the
@@ -149,8 +153,83 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            statements such as functions, table
                                            sources, or sinks. Can be used
                                            multiple times.
+     -pyarch,--pyArchives <arg>            Add python archive files for job. The
+                                           archive files will be extracted to
+                                           the working directory of python UDF
+                                           worker. Currently only zip-format is
+                                           supported. For each archive file, a
+                                           target directory be specified. If the
+                                           target directory name is specified,
+                                           the archive file will be extracted to
+                                           a name can directory with the
+                                           specified name. Otherwise, the
+                                           archive file will be extracted to a
+                                           directory with the same name of the
+                                           archive file. The files uploaded via
+                                           this option are accessible via
+                                           relative path. '#' could be used as
+                                           the separator of the archive file
+                                           path and the target directory name.
+                                           Comma (',') could be used as the
+                                           separator to specify multiple archive
+                                           files. This option can be used to
+                                           upload the virtual environment, the
+                                           data files used in Python UDF (e.g.:
+                                           --pyArchives
+                                           file:///tmp/py37.zip,file:///tmp/data
+                                           .zip#data --pyExecutable
+                                           py37.zip/py37/bin/python). The data
+                                           files could be accessed in Python
+                                           UDF, e.g.: f = open('data/data.txt',
+                                           'r').
+     -pyexec,--pyExecutable <arg>          Specify the path of the python
+                                           interpreter used to execute the
+                                           python UDF worker (e.g.:
+                                           --pyExecutable
+                                           /usr/local/bin/python3). The python
+                                           UDF worker depends on Python 3.5+,
+                                           Apache Beam (version == 2.19.0), Pip
+                                           (version >= 7.1.0) and SetupTools
+                                           (version >= 37.0.0). Please ensure
+                                           that the specified environment meets
+                                           the above requirements.
+     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
+                                           These files will be added to the
+                                           PYTHONPATH of both the local client
+                                           and the remote python UDF worker. The
+                                           standard python resource file
+                                           suffixes such as .py/.egg/.zip or
+                                           directory are all supported. Comma
+                                           (',') could be used as the separator
+                                           to specify multiple files (e.g.:
+                                           --pyFiles
+                                           file:///tmp/myresource.zip,hdfs:///$n
+                                           amenode_address/myresource2.zip).
+     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file which
+                                           defines the third-party dependencies.
+                                           These dependencies will be installed
+                                           and added to the PYTHONPATH of the
+                                           python UDF worker. A directory which
+                                           contains the installation packages of
+                                           these dependencies could be specified
+                                           optionally. Use '#' as the separator
+                                           if the optional parameter exists
+                                           (e.g.: --pyRequirements
+                                           file:///tmp/requirements.txt#file:///
+                                           tmp/cached_dir).
      -s,--session <session identifier>     The identifier for a session.
                                            'default' is the default identifier.
+     -u,--update <SQL update statement>    Experimental (for testing only!):
+                                           Instructs the SQL Client to
+                                           immediately execute the given update
+                                           statement after starting up. The
+                                           process is shut down after the
+                                           statement has been submitted to the
+                                           cluster and returns an appropriate
+                                           return code. Currently, this feature
+                                           is only supported for INSERT INTO
+                                           statements that declare the target
+                                           sink table.
 {% endhighlight %}
 
 {% top %}
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 44355c8..c234390 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -137,6 +137,10 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            properties.
      -h,--help                             Show the help message with
                                            descriptions of all options.
+     -hist,--history <History file path>   The file which you want to save the
+                                           command history into. If not
+                                           specified, we will auto-generate one
+                                           under your user's home directory.
      -j,--jar <JAR file>                   A JAR file to be imported into the
                                            session. The file might contain
                                            user-defined classes needed for the
@@ -150,8 +154,83 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            statements such as functions, table
                                            sources, or sinks. Can be used
                                            multiple times.
+     -pyarch,--pyArchives <arg>            Add python archive files for job. The
+                                           archive files will be extracted to
+                                           the working directory of python UDF
+                                           worker. Currently only zip-format is
+                                           supported. For each archive file, a
+                                           target directory be specified. If the
+                                           target directory name is specified,
+                                           the archive file will be extracted to
+                                           a name can directory with the
+                                           specified name. Otherwise, the
+                                           archive file will be extracted to a
+                                           directory with the same name of the
+                                           archive file. The files uploaded via
+                                           this option are accessible via
+                                           relative path. '#' could be used as
+                                           the separator of the archive file
+                                           path and the target directory name.
+                                           Comma (',') could be used as the
+                                           separator to specify multiple archive
+                                           files. This option can be used to
+                                           upload the virtual environment, the
+                                           data files used in Python UDF (e.g.:
+                                           --pyArchives
+                                           file:///tmp/py37.zip,file:///tmp/data
+                                           .zip#data --pyExecutable
+                                           py37.zip/py37/bin/python). The data
+                                           files could be accessed in Python
+                                           UDF, e.g.: f = open('data/data.txt',
+                                           'r').
+     -pyexec,--pyExecutable <arg>          Specify the path of the python
+                                           interpreter used to execute the
+                                           python UDF worker (e.g.:
+                                           --pyExecutable
+                                           /usr/local/bin/python3). The python
+                                           UDF worker depends on Python 3.5+,
+                                           Apache Beam (version == 2.19.0), Pip
+                                           (version >= 7.1.0) and SetupTools
+                                           (version >= 37.0.0). Please ensure
+                                           that the specified environment meets
+                                           the above requirements.
+     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
+                                           These files will be added to the
+                                           PYTHONPATH of both the local client
+                                           and the remote python UDF worker. The
+                                           standard python resource file
+                                           suffixes such as .py/.egg/.zip or
+                                           directory are all supported. Comma
+                                           (',') could be used as the separator
+                                           to specify multiple files (e.g.:
+                                           --pyFiles
+                                           file:///tmp/myresource.zip,hdfs:///$n
+                                           amenode_address/myresource2.zip).
+     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file which
+                                           defines the third-party dependencies.
+                                           These dependencies will be installed
+                                           and added to the PYTHONPATH of the
+                                           python UDF worker. A directory which
+                                           contains the installation packages of
+                                           these dependencies could be specified
+                                           optionally. Use '#' as the separator
+                                           if the optional parameter exists
+                                           (e.g.: --pyRequirements
+                                           file:///tmp/requirements.txt#file:///
+                                           tmp/cached_dir).
      -s,--session <session identifier>     The identifier for a session.
                                            'default' is the default identifier.
+     -u,--update <SQL update statement>    Experimental (for testing only!):
+                                           Instructs the SQL Client to
+                                           immediately execute the given update
+                                           statement after starting up. The
+                                           process is shut down after the
+                                           statement has been submitted to the
+                                           cluster and returns an appropriate
+                                           return code. Currently, this feature
+                                           is only supported for INSERT INTO
+                                           statements that declare the target
+                                           sink table.
 {% endhighlight %}
 
 {% top %}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 3f68dc6..627e70b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.client.cli.CliClient;
 import org.apache.flink.table.client.cli.CliOptions;
 import org.apache.flink.table.client.cli.CliOptionsParser;
@@ -36,7 +37,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.entries.ConfigurationEntry.create;
+import static org.apache.flink.table.client.config.entries.ConfigurationEntry.merge;
 
 /**
  * SQL Client for submitting SQL statements. The client can be executed in two
@@ -90,6 +96,7 @@ public class SqlClient {
 
 			// create CLI client with session environment
 			final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
+			appendPythonConfig(sessionEnv, options.getPythonConfiguration());
 			final SessionContext context;
 			if (options.getSessionId() == null) {
 				context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
@@ -166,6 +173,12 @@ public class SqlClient {
 		}
 	}
 
+	private static void appendPythonConfig(Environment env, Configuration pythonConfiguration) {
+		Map<String, Object> pythonConfig = new HashMap<>(pythonConfiguration.toMap());
+		Map<String, Object> combinedConfig = new HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap());
+		env.setConfiguration(combinedConfig);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public static void main(String[] args) {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index 4eaed62..c211f79 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.cli;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.net.URL;
 import java.util.List;
 
@@ -35,6 +37,7 @@ public class CliOptions {
 	private final List<URL> libraryDirs;
 	private final String updateStatement;
 	private final String historyFilePath;
+	private final Configuration pythonConfiguration;
 
 	public CliOptions(
 			boolean isPrintHelp,
@@ -44,7 +47,8 @@ public class CliOptions {
 			List<URL> jars,
 			List<URL> libraryDirs,
 			String updateStatement,
-			String historyFilePath) {
+			String historyFilePath,
+			Configuration pythonConfiguration) {
 		this.isPrintHelp = isPrintHelp;
 		this.sessionId = sessionId;
 		this.environment = environment;
@@ -53,6 +57,7 @@ public class CliOptions {
 		this.libraryDirs = libraryDirs;
 		this.updateStatement = updateStatement;
 		this.historyFilePath = historyFilePath;
+		this.pythonConfiguration = pythonConfiguration;
 	}
 
 	public boolean isPrintHelp() {
@@ -86,4 +91,8 @@ public class CliOptions {
 	public String getHistoryFilePath() {
 		return historyFilePath;
 	}
+
+	public Configuration getPythonConfiguration() {
+		return pythonConfiguration;
+	}
 }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index a7626bb..2c6b7ef 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.cli;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.client.SqlClientException;
 
@@ -29,11 +30,18 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 
 import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+
 /**
  * Parser for command line options.
  */
@@ -145,6 +153,10 @@ public class CliOptionsParser {
 		options.addOption(OPTION_LIBRARY);
 		options.addOption(OPTION_UPDATE);
 		options.addOption(OPTION_HISTORY);
+		options.addOption(PYFILES_OPTION);
+		options.addOption(PYREQUIREMENTS_OPTION);
+		options.addOption(PYARCHIVE_OPTION);
+		options.addOption(PYEXEC_OPTION);
 		return options;
 	}
 
@@ -154,6 +166,10 @@ public class CliOptionsParser {
 		options.addOption(OPTION_ENVIRONMENT);
 		options.addOption(OPTION_UPDATE);
 		options.addOption(OPTION_HISTORY);
+		options.addOption(PYFILES_OPTION);
+		options.addOption(PYREQUIREMENTS_OPTION);
+		options.addOption(PYARCHIVE_OPTION);
+		options.addOption(PYEXEC_OPTION);
 		return options;
 	}
 
@@ -162,6 +178,10 @@ public class CliOptionsParser {
 		options.addOption(OPTION_DEFAULTS);
 		options.addOption(OPTION_JAR);
 		options.addOption(OPTION_LIBRARY);
+		options.addOption(PYFILES_OPTION);
+		options.addOption(PYREQUIREMENTS_OPTION);
+		options.addOption(PYARCHIVE_OPTION);
+		options.addOption(PYEXEC_OPTION);
 		return options;
 	}
 
@@ -249,7 +269,8 @@ public class CliOptionsParser {
 				checkUrls(line, CliOptionsParser.OPTION_JAR),
 				checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
 				line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
-				line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt())
+				line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
+				getPythonConfiguration(line)
 			);
 		}
 		catch (ParseException e) {
@@ -269,7 +290,8 @@ public class CliOptionsParser {
 				checkUrls(line, CliOptionsParser.OPTION_JAR),
 				checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
 				line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
-				line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt())
+				line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
+				getPythonConfiguration(line)
 			);
 		}
 		catch (ParseException e) {
@@ -289,7 +311,8 @@ public class CliOptionsParser {
 				checkUrls(line, CliOptionsParser.OPTION_JAR),
 				checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
 				null,
-				null
+				null,
+				getPythonConfiguration(line)
 			);
 		}
 		catch (ParseException e) {
@@ -331,4 +354,18 @@ public class CliOptionsParser {
 		}
 		return sessionId;
 	}
+
+	private static Configuration getPythonConfiguration(CommandLine line) {
+		try {
+			Class<?> clazz = Class.forName(
+				"org.apache.flink.python.util.PythonDependencyUtils",
+				true,
+				Thread.currentThread().getContextClassLoader());
+			Method parsePythonDependencyConfiguration =
+				clazz.getMethod("parsePythonDependencyConfiguration", CommandLine.class);
+			return (Configuration) parsePythonDependencyConfiguration.invoke(null, line);
+		} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			throw new SqlClientException("Failed to parse the Python command line options.", e);
+		}
+	}
 }