You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/05/13 01:22:48 UTC
[flink] branch master updated: [FLINK-17612][python][sql-client]
Support Python command line options in SQL Client. (#12077)
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 722f844 [FLINK-17612][python][sql-client] Support Python command line options in SQL Client. (#12077)
722f844 is described below
commit 722f8445b7f8f2df90af9e8c4b3cfa231d71129d
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Wed May 13 09:22:29 2020 +0800
[FLINK-17612][python][sql-client] Support Python command line options in SQL Client. (#12077)
---
docs/dev/table/sqlClient.md | 79 ++++++++++++++++++++++
docs/dev/table/sqlClient.zh.md | 79 ++++++++++++++++++++++
.../org/apache/flink/table/client/SqlClient.java | 13 ++++
.../apache/flink/table/client/cli/CliOptions.java | 11 ++-
.../flink/table/client/cli/CliOptionsParser.java | 43 +++++++++++-
5 files changed, 221 insertions(+), 4 deletions(-)
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index daa1419..8331efe 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -136,6 +136,10 @@ Mode "embedded" submits Flink jobs from the local machine.
properties.
-h,--help Show the help message with
descriptions of all options.
+ -hist,--history <History file path> The file which you want to save the
+ command history into. If not
+ specified, we will auto-generate one
+ under your user's home directory.
-j,--jar <JAR file> A JAR file to be imported into the
session. The file might contain
user-defined classes needed for the
@@ -149,8 +153,83 @@ Mode "embedded" submits Flink jobs from the local machine.
statements such as functions, table
sources, or sinks. Can be used
multiple times.
+ -pyarch,--pyArchives <arg> Add python archive files for job. The
+ archive files will be extracted to
+ the working directory of python UDF
+ worker. Currently only zip-format is
+ supported. For each archive file, a
+ target directory be specified. If the
+ target directory name is specified,
+ the archive file will be extracted to
+ a name can directory with the
+ specified name. Otherwise, the
+ archive file will be extracted to a
+ directory with the same name of the
+ archive file. The files uploaded via
+ this option are accessible via
+ relative path. '#' could be used as
+ the separator of the archive file
+ path and the target directory name.
+ Comma (',') could be used as the
+ separator to specify multiple archive
+ files. This option can be used to
+ upload the virtual environment, the
+ data files used in Python UDF (e.g.:
+ --pyArchives
+ file:///tmp/py37.zip,file:///tmp/data
+ .zip#data --pyExecutable
+ py37.zip/py37/bin/python). The data
+ files could be accessed in Python
+ UDF, e.g.: f = open('data/data.txt',
+ 'r').
+ -pyexec,--pyExecutable <arg> Specify the path of the python
+ interpreter used to execute the
+ python UDF worker (e.g.:
+ --pyExecutable
+ /usr/local/bin/python3). The python
+ UDF worker depends on Python 3.5+,
+ Apache Beam (version == 2.19.0), Pip
+ (version >= 7.1.0) and SetupTools
+ (version >= 37.0.0). Please ensure
+ that the specified environment meets
+ the above requirements.
+ -pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
+ These files will be added to the
+ PYTHONPATH of both the local client
+ and the remote python UDF worker. The
+ standard python resource file
+ suffixes such as .py/.egg/.zip or
+ directory are all supported. Comma
+ (',') could be used as the separator
+ to specify multiple files (e.g.:
+ --pyFiles
+ file:///tmp/myresource.zip,hdfs:///$n
+ amenode_address/myresource2.zip).
+ -pyreq,--pyRequirements <arg> Specify a requirements.txt file which
+ defines the third-party dependencies.
+ These dependencies will be installed
+ and added to the PYTHONPATH of the
+ python UDF worker. A directory which
+ contains the installation packages of
+ these dependencies could be specified
+ optionally. Use '#' as the separator
+ if the optional parameter exists
+ (e.g.: --pyRequirements
+ file:///tmp/requirements.txt#file:///
+ tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default identifier.
+ -u,--update <SQL update statement> Experimental (for testing only!):
+ Instructs the SQL Client to
+ immediately execute the given update
+ statement after starting up. The
+ process is shut down after the
+ statement has been submitted to the
+ cluster and returns an appropriate
+ return code. Currently, this feature
+ is only supported for INSERT INTO
+ statements that declare the target
+ sink table.
{% endhighlight %}
{% top %}
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 44355c8..c234390 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -137,6 +137,10 @@ Mode "embedded" submits Flink jobs from the local machine.
properties.
-h,--help Show the help message with
descriptions of all options.
+ -hist,--history <History file path> The file which you want to save the
+ command history into. If not
+ specified, we will auto-generate one
+ under your user's home directory.
-j,--jar <JAR file> A JAR file to be imported into the
session. The file might contain
user-defined classes needed for the
@@ -150,8 +154,83 @@ Mode "embedded" submits Flink jobs from the local machine.
statements such as functions, table
sources, or sinks. Can be used
multiple times.
+ -pyarch,--pyArchives <arg> Add python archive files for job. The
+ archive files will be extracted to
+ the working directory of python UDF
+ worker. Currently only zip-format is
+ supported. For each archive file, a
+ target directory be specified. If the
+ target directory name is specified,
+ the archive file will be extracted to
+ a name can directory with the
+ specified name. Otherwise, the
+ archive file will be extracted to a
+ directory with the same name of the
+ archive file. The files uploaded via
+ this option are accessible via
+ relative path. '#' could be used as
+ the separator of the archive file
+ path and the target directory name.
+ Comma (',') could be used as the
+ separator to specify multiple archive
+ files. This option can be used to
+ upload the virtual environment, the
+ data files used in Python UDF (e.g.:
+ --pyArchives
+ file:///tmp/py37.zip,file:///tmp/data
+ .zip#data --pyExecutable
+ py37.zip/py37/bin/python). The data
+ files could be accessed in Python
+ UDF, e.g.: f = open('data/data.txt',
+ 'r').
+ -pyexec,--pyExecutable <arg> Specify the path of the python
+ interpreter used to execute the
+ python UDF worker (e.g.:
+ --pyExecutable
+ /usr/local/bin/python3). The python
+ UDF worker depends on Python 3.5+,
+ Apache Beam (version == 2.19.0), Pip
+ (version >= 7.1.0) and SetupTools
+ (version >= 37.0.0). Please ensure
+ that the specified environment meets
+ the above requirements.
+ -pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
+ These files will be added to the
+ PYTHONPATH of both the local client
+ and the remote python UDF worker. The
+ standard python resource file
+ suffixes such as .py/.egg/.zip or
+ directory are all supported. Comma
+ (',') could be used as the separator
+ to specify multiple files (e.g.:
+ --pyFiles
+ file:///tmp/myresource.zip,hdfs:///$n
+ amenode_address/myresource2.zip).
+ -pyreq,--pyRequirements <arg> Specify a requirements.txt file which
+ defines the third-party dependencies.
+ These dependencies will be installed
+ and added to the PYTHONPATH of the
+ python UDF worker. A directory which
+ contains the installation packages of
+ these dependencies could be specified
+ optionally. Use '#' as the separator
+ if the optional parameter exists
+ (e.g.: --pyRequirements
+ file:///tmp/requirements.txt#file:///
+ tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default identifier.
+ -u,--update <SQL update statement> Experimental (for testing only!):
+ Instructs the SQL Client to
+ immediately execute the given update
+ statement after starting up. The
+ process is shut down after the
+ statement has been submitted to the
+ cluster and returns an appropriate
+ return code. Currently, this feature
+ is only supported for INSERT INTO
+ statements that declare the target
+ sink table.
{% endhighlight %}
{% top %}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 3f68dc6..627e70b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.client;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.cli.CliClient;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
@@ -36,7 +37,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.entries.ConfigurationEntry.create;
+import static org.apache.flink.table.client.config.entries.ConfigurationEntry.merge;
/**
* SQL Client for submitting SQL statements. The client can be executed in two
@@ -90,6 +96,7 @@ public class SqlClient {
// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
+ appendPythonConfig(sessionEnv, options.getPythonConfiguration());
final SessionContext context;
if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
@@ -166,6 +173,12 @@ public class SqlClient {
}
}
+ private static void appendPythonConfig(Environment env, Configuration pythonConfiguration) {
+ Map<String, Object> pythonConfig = new HashMap<>(pythonConfiguration.toMap());
+ Map<String, Object> combinedConfig = new HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap());
+ env.setConfiguration(combinedConfig);
+ }
+
// --------------------------------------------------------------------------------------------
public static void main(String[] args) {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index 4eaed62..c211f79 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.client.cli;
+import org.apache.flink.configuration.Configuration;
+
import java.net.URL;
import java.util.List;
@@ -35,6 +37,7 @@ public class CliOptions {
private final List<URL> libraryDirs;
private final String updateStatement;
private final String historyFilePath;
+ private final Configuration pythonConfiguration;
public CliOptions(
boolean isPrintHelp,
@@ -44,7 +47,8 @@ public class CliOptions {
List<URL> jars,
List<URL> libraryDirs,
String updateStatement,
- String historyFilePath) {
+ String historyFilePath,
+ Configuration pythonConfiguration) {
this.isPrintHelp = isPrintHelp;
this.sessionId = sessionId;
this.environment = environment;
@@ -53,6 +57,7 @@ public class CliOptions {
this.libraryDirs = libraryDirs;
this.updateStatement = updateStatement;
this.historyFilePath = historyFilePath;
+ this.pythonConfiguration = pythonConfiguration;
}
public boolean isPrintHelp() {
@@ -86,4 +91,8 @@ public class CliOptions {
public String getHistoryFilePath() {
return historyFilePath;
}
+
+ public Configuration getPythonConfiguration() {
+ return pythonConfiguration;
+ }
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index a7626bb..2c6b7ef 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.client.cli;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.client.SqlClientException;
@@ -29,11 +30,18 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+
/**
* Parser for command line options.
*/
@@ -145,6 +153,10 @@ public class CliOptionsParser {
options.addOption(OPTION_LIBRARY);
options.addOption(OPTION_UPDATE);
options.addOption(OPTION_HISTORY);
+ options.addOption(PYFILES_OPTION);
+ options.addOption(PYREQUIREMENTS_OPTION);
+ options.addOption(PYARCHIVE_OPTION);
+ options.addOption(PYEXEC_OPTION);
return options;
}
@@ -154,6 +166,10 @@ public class CliOptionsParser {
options.addOption(OPTION_ENVIRONMENT);
options.addOption(OPTION_UPDATE);
options.addOption(OPTION_HISTORY);
+ options.addOption(PYFILES_OPTION);
+ options.addOption(PYREQUIREMENTS_OPTION);
+ options.addOption(PYARCHIVE_OPTION);
+ options.addOption(PYEXEC_OPTION);
return options;
}
@@ -162,6 +178,10 @@ public class CliOptionsParser {
options.addOption(OPTION_DEFAULTS);
options.addOption(OPTION_JAR);
options.addOption(OPTION_LIBRARY);
+ options.addOption(PYFILES_OPTION);
+ options.addOption(PYREQUIREMENTS_OPTION);
+ options.addOption(PYARCHIVE_OPTION);
+ options.addOption(PYEXEC_OPTION);
return options;
}
@@ -249,7 +269,8 @@ public class CliOptionsParser {
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
- line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt())
+ line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
+ getPythonConfiguration(line)
);
}
catch (ParseException e) {
@@ -269,7 +290,8 @@ public class CliOptionsParser {
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
- line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt())
+ line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
+ getPythonConfiguration(line)
);
}
catch (ParseException e) {
@@ -289,7 +311,8 @@ public class CliOptionsParser {
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
null,
- null
+ null,
+ getPythonConfiguration(line)
);
}
catch (ParseException e) {
@@ -331,4 +354,18 @@ public class CliOptionsParser {
}
return sessionId;
}
+
+ private static Configuration getPythonConfiguration(CommandLine line) {
+ try {
+ Class<?> clazz = Class.forName(
+ "org.apache.flink.python.util.PythonDependencyUtils",
+ true,
+ Thread.currentThread().getContextClassLoader());
+ Method parsePythonDependencyConfiguration =
+ clazz.getMethod("parsePythonDependencyConfiguration", CommandLine.class);
+ return (Configuration) parsePythonDependencyConfiguration.invoke(null, line);
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new SqlClientException("Failed to parse the Python command line options.", e);
+ }
+ }
}