You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/28 07:35:07 UTC
[flink] 01/02: [FLINK-12327][python] Adds support to submit Python
Table API job in CliFrontend This closes #8472
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ec61cf21276fb6eea599b40ea999613ffeef48ef
Author: Huang Xingbo <hx...@gmail.com>
AuthorDate: Fri May 17 11:17:23 2019 +0800
[FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend
This closes #8472
---
docs/ops/cli.md | 66 ++++++
docs/ops/cli.zh.md | 63 +++++-
.../org/apache/flink/client/cli/CliFrontend.java | 56 ++++--
.../apache/flink/client/cli/CliFrontendParser.java | 29 +++
.../apache/flink/client/cli/ProgramOptions.java | 74 ++++++-
.../flink/client/program/PackagedProgram.java | 32 ++-
.../apache/flink/client/python/PythonDriver.java | 168 ++++++++++++++++
.../flink/client/python/PythonGatewayServer.java | 21 +-
.../org/apache/flink/client/python/PythonUtil.java | 223 +++++++++++++++++++++
.../flink/client/python/PythonDriverTest.java | 104 ++++++++++
.../apache/flink/client/python/PythonUtilTest.java | 118 +++++++++++
flink-dist/pom.xml | 3 +
flink-dist/src/main/assemblies/bin.xml | 7 +
flink-python/pyflink/find_flink_home.py | 3 +
flink-python/pyflink/java_gateway.py | 14 +-
.../pyflink/table/examples/batch/__init__.py | 17 ++
.../pyflink/table/examples/batch/word_count.py | 79 ++++++++
17 files changed, 1032 insertions(+), 45 deletions(-)
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index b414b30..505207d 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -47,6 +47,12 @@ available.
{:toc}
## Examples
+### Job Submission Examples
+-----------------------------
+
+These examples about how to submit a job in CLI.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
- Run example program with no arguments:
@@ -88,6 +94,53 @@ available.
./examples/batch/WordCount.jar \
--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
+</div>
+
+<div data-lang="python" markdown="1">
+
+- Run Python Table program:
+
+ ./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- Run Python Table program with pyFiles:
+
+ ./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar> \
+ -pyfs file:///user.txt,hdfs:///$namenode_address/username.txt
+
+- Run Python Table program with pyFiles and pyModule:
+
+ ./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j <path/to/flink-table.jar>
+
+- Run Python Table program with parallelism 16:
+
+ ./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- Run Python Table program with flink log output disabled:
+
+ ./bin/flink run -q -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- Run Python Table program in detached mode:
+
+ ./bin/flink run -d examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- Run Python Table program on a specific JobManager:
+
+ ./bin/flink run -m myJMHost:8081 \
+ -py examples/python/table/batch/word_count.py \
+ -j <path/to/flink-table.jar>
+
+- Run Python Table program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
+
+ ./bin/flink run -m yarn-cluster -yn 2 \
+ -py examples/python/table/batch/word_count.py \
+ -j <path/to/flink-table.jar>
+</div>
+
+### Job Management Examples
+-----------------------------
+
+These examples about how to manage a job in CLI.
+
- Display the optimized execution plan for the WordCount example program as JSON:
./bin/flink info ./examples/batch/WordCount.jar \
@@ -251,6 +304,19 @@ Action "run" compiles and runs a program.
program. Optional flag to override the
default value specified in the
configuration.
+ -py,--python <python-file> Python script with the program entry
+ point.The dependent resources can be
+ configured with the `--pyFiles` option.
+ -pyfs,--pyFiles <python-files> Attach custom python files for job.
+ Comma can be used as the separator to
+ specify multiple files. The standard
+ python resource file suffixes such as
+ .py/.egg/.zip are all supported.
+ (eg:--pyFiles file:///tmp/myresource.zip
+ ,hdfs:///$namenode_address/myresource2.zip)
+ -pym,--pyModule <python-module> Python module with the program entry
+ point. This option must be used in
+ conjunction with ` --pyFiles`.
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index 7c02047..93f16fb 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -47,6 +47,12 @@ available.
{:toc}
## Examples
+### 作业提交示例
+-----------------------------
+
+这些示例是关于如何通过脚本提交一个作业
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
- Run example program with no arguments:
@@ -82,11 +88,57 @@ available.
./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
-- Run example program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
+- Run example program using a [per-job YARN cluster]({{site.baseurl}}/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
./bin/flink run -m yarn-cluster -yn 2 \
./examples/batch/WordCount.jar \
--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
+
+</div>
+
+<div data-lang="python" markdown="1">
+
+- 提交一个Python Table的作业:
+
+ ./bin/flink run -py WordCount.py -j <path/to/flink-table.jar>
+
+- 提交一个有多个依赖的Python Table的作业:
+
+ ./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar> \
+ -pyfs file:///user.txt,hdfs:///$namenode_address/username.txt
+
+- 提交一个有多个依赖的Python Table的作业,Python作业的主入口通过pym选项指定:
+
+ ./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j <path/to/flink-table.jar>
+
+- 提交一个指定并发度为16的Python Table的作业:
+
+ ./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- 提交一个关闭flink日志输出的Python Table的作业:
+
+ ./bin/flink run -q -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- 提交一个运行在detached模式下的Python Table的作业:
+
+ ./bin/flink run -d examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+- 提交一个运行在指定JobManager上的Python Table的作业:
+
+ ./bin/flink run -m myJMHost:8081 \
+ -py examples/python/table/batch/word_count.py \
+ -j <path/to/flink-table.jar>
+
+- 提交一个运行在有两个TaskManager的[per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn)的Python Table的作业:
+
+ ./bin/flink run -m yarn-cluster -yn 2 \
+ -py examples/python/table/batch/word_count.py \
+ -j <path/to/flink-table.jar>
+
+</div>
+
+### 作业管理示例
+-----------------------------
- Display the optimized execution plan for the WordCount example program as JSON:
@@ -251,6 +303,15 @@ Action "run" compiles and runs a program.
program. Optional flag to override the
default value specified in the
configuration.
+ -py,--python <python-file> 指定Python作业的入口,依赖的资源文件可以通过
+ `--pyFiles`进行指定。
+ -pyfs,--pyFiles <python-files> 指定Python作业依赖的一些自定义的python文件,
+ 如果有多个文件,可以通过逗号(,)进行分隔。支持
+ 常用的python资源文件,例如(.py/.egg/.zip)。
+ (例如:--pyFiles file:///tmp/myresource.zip
+ ,hdfs:///$namenode_address/myresource2.zip)
+ -pym,--pyModule <python-module> 指定python程序的运行的模块入口,这个选项必须配合
+ `--pyFiles`一起使用。
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c6b5c9a..c591e6e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -32,6 +32,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.client.python.PythonDriver;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -185,8 +186,11 @@ public class CliFrontend {
return;
}
- if (runOptions.getJarFilePath() == null) {
- throw new CliArgsException("The program JAR file was not specified.");
+ if (!runOptions.isPython()) {
+ // Java program should be specified a JAR file
+ if (runOptions.getJarFilePath() == null) {
+ throw new CliArgsException("Java program should be specified a JAR file.");
+ }
}
final PackagedProgram program;
@@ -771,12 +775,42 @@ public class CliFrontend {
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
- if (jarFilePath == null) {
- throw new IllegalArgumentException("The program JAR file was not specified.");
+ String entryPointClass;
+ File jarFile = null;
+ if (options.isPython()) {
+ // If the job is specified a jar file
+ if (jarFilePath != null) {
+ jarFile = getJarFile(jarFilePath);
+ }
+ // The entry point class of python job is PythonDriver
+ entryPointClass = PythonDriver.class.getCanonicalName();
+ } else {
+ if (jarFilePath == null) {
+ throw new IllegalArgumentException("The program JAR file was not specified.");
+ }
+ jarFile = getJarFile(jarFilePath);
+ // Get assembler class
+ entryPointClass = options.getEntryPointClassName();
}
- File jarFile = new File(jarFilePath);
+ PackagedProgram program = entryPointClass == null ?
+ new PackagedProgram(jarFile, classpaths, programArgs) :
+ new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
+
+ program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
+
+ return program;
+ }
+ /**
+ * Gets the JAR file from the path.
+ *
+ * @param jarFilePath The path of JAR file
+ * @return The JAR file
+ * @throws FileNotFoundException The JAR file does not exist.
+ */
+ private File getJarFile(String jarFilePath) throws FileNotFoundException {
+ File jarFile = new File(jarFilePath);
// Check if JAR file exists
if (!jarFile.exists()) {
throw new FileNotFoundException("JAR file does not exist: " + jarFile);
@@ -784,17 +818,7 @@ public class CliFrontend {
else if (!jarFile.isFile()) {
throw new FileNotFoundException("JAR file is not a file: " + jarFile);
}
-
- // Get assembler class
- String entryPointClass = options.getEntryPointClassName();
-
- PackagedProgram program = entryPointClass == null ?
- new PackagedProgram(jarFile, classpaths, programArgs) :
- new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
-
- program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
-
- return program;
+ return jarFile;
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index cea3998..5872a54 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -118,6 +118,20 @@ public class CliFrontendParser {
public static final Option STOP_AND_DRAIN = new Option("d", "drain", false,
"Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");
+ static final Option PY_OPTION = new Option("py", "python", true,
+ "Python script with the program entry point. " +
+ "The dependent resources can be configured with the `--pyFiles` option.");
+
+ static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
+ "Attach custom python files for job. " +
+ "Comma can be used as the separator to specify multiple files. " +
+ "The standard python resource file suffixes such as .py/.egg/.zip are all supported." +
+ "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
+
+ static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true,
+ "Python module with the program entry point. " +
+ "This option must be used in conjunction with `--pyFiles`.");
+
static {
HELP_OPTION.setRequired(false);
@@ -165,6 +179,15 @@ public class CliFrontendParser {
STOP_WITH_SAVEPOINT.setOptionalArg(true);
STOP_AND_DRAIN.setRequired(false);
+
+ PY_OPTION.setRequired(false);
+ PY_OPTION.setArgName("python");
+
+ PYFILES_OPTION.setRequired(false);
+ PYFILES_OPTION.setArgName("pyFiles");
+
+ PYMODULE_OPTION.setRequired(false);
+ PYMODULE_OPTION.setArgName("pyModule");
}
private static final Options RUN_OPTIONS = getRunCommandOptions();
@@ -186,6 +209,9 @@ public class CliFrontendParser {
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
+ options.addOption(PY_OPTION);
+ options.addOption(PYFILES_OPTION);
+ options.addOption(PYMODULE_OPTION);
return options;
}
@@ -196,6 +222,9 @@ public class CliFrontendParser {
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
+ options.addOption(PY_OPTION);
+ options.addOption(PYFILES_OPTION);
+ options.addOption(PYMODULE_OPTION);
return options;
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index da03d64..30b3867 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -36,6 +36,9 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
@@ -62,17 +65,71 @@ public abstract class ProgramOptions extends CommandLineOptions {
private final SavepointRestoreSettings savepointSettings;
+ /**
+ * Flag indicating whether the job is a Python job.
+ */
+ private final boolean isPython;
+
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
- line.getOptionValues(ARGS_OPTION.getOpt()) :
- line.getArgs();
+ line.getOptionValues(ARGS_OPTION.getOpt()) :
+ line.getArgs();
+
+ isPython = line.hasOption(PY_OPTION.getOpt()) | line.hasOption(PYMODULE_OPTION.getOpt());
+ // If specified the option -py(--python)
+ if (line.hasOption(PY_OPTION.getOpt())) {
+ // Cannot use option -py and -pym simultaneously.
+ if (line.hasOption(PYMODULE_OPTION.getOpt())) {
+ throw new CliArgsException("Cannot use option -py and -pym simultaneously.");
+ }
+ // The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
+ // CLI cmd : -py ${python.py} pyfs [optional] ${py-files} [optional] ${other args}.
+ // PythonDriver args: py ${python.py} [optional] pyfs [optional] ${py-files} [optional] ${other args}.
+ // -------------------------------transformed-------------------------------------------------------
+ // e.g. -py wordcount.py(CLI cmd) -----------> py wordcount.py(PythonDriver args)
+ // e.g. -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd)
+ // -----> py wordcount.py pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args)
+ String[] newArgs;
+ int argIndex;
+ if (line.hasOption(PYFILES_OPTION.getOpt())) {
+ newArgs = new String[args.length + 4];
+ newArgs[2] = PYFILES_OPTION.getOpt();
+ newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
+ argIndex = 4;
+ } else {
+ newArgs = new String[args.length + 2];
+ argIndex = 2;
+ }
+ newArgs[0] = PY_OPTION.getOpt();
+ newArgs[1] = line.getOptionValue(PY_OPTION.getOpt());
+ System.arraycopy(args, 0, newArgs, argIndex, args.length);
+ args = newArgs;
+ }
+
+ // If specified the option -pym(--pyModule)
+ if (line.hasOption(PYMODULE_OPTION.getOpt())) {
+ // If you specify the option -pym, you should specify the option --pyFiles simultaneously.
+ if (!line.hasOption(PYFILES_OPTION.getOpt())) {
+ throw new CliArgsException("-pym must be used in conjunction with `--pyFiles`");
+ }
+ // The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
+ // CLI cmd : -pym ${py-module} -pyfs ${py-files} [optional] ${other args}.
+ // PythonDriver args: pym ${py-module} pyfs ${py-files} [optional] ${other args}.
+ // e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> pym AAA.fun -pyfs AAA.zip(PythonDriver args)
+ String[] newArgs = new String[args.length + 4];
+ newArgs[0] = PYMODULE_OPTION.getOpt();
+ newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt());
+ newArgs[2] = PYFILES_OPTION.getOpt();
+ newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
+ System.arraycopy(args, 0, newArgs, 4, args.length);
+ args = newArgs;
+ }
if (line.hasOption(JAR_OPTION.getOpt())) {
this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
- }
- else if (args.length > 0) {
+ } else if (!isPython && args.length > 0) {
jarFilePath = args[0];
args = Arrays.copyOfRange(args, 1, args.length);
}
@@ -95,7 +152,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
this.classpaths = classpaths;
this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
- line.getOptionValue(CLASS_OPTION.getOpt()) : null;
+ line.getOptionValue(CLASS_OPTION.getOpt()) : null;
if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
@@ -156,4 +213,11 @@ public abstract class ProgramOptions extends CommandLineOptions {
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
+
+ /**
+ * Indicates whether the job is a Python job.
+ */
+ public boolean isPython() {
+ return isPython;
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 8f5ccba..77b5d29 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.client.python.PythonDriver;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -91,6 +92,11 @@ public class PackagedProgram {
private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
/**
+ * Flag indicating whether the job is a Python job.
+ */
+ private final boolean isPython;
+
+ /**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
*
@@ -169,18 +175,21 @@ public class PackagedProgram {
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
- if (jarFile == null) {
- throw new IllegalArgumentException("The jar file must not be null.");
- }
+ // Whether the job is a Python job.
+ isPython = entryPointClassName != null && entryPointClassName.equals(PythonDriver.class.getCanonicalName());
- URL jarFileUrl;
- try {
- jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
- } catch (MalformedURLException e1) {
- throw new IllegalArgumentException("The jar file path is invalid.");
- }
+ URL jarFileUrl = null;
+ if (jarFile != null) {
+ try {
+ jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
+ } catch (MalformedURLException e1) {
+ throw new IllegalArgumentException("The jar file path is invalid.");
+ }
- checkJarFile(jarFileUrl);
+ checkJarFile(jarFileUrl);
+ } else if (!isPython) {
+ throw new IllegalArgumentException("The jar file must not be null.");
+ }
this.jarFile = jarFileUrl;
this.args = args == null ? new String[0] : args;
@@ -191,7 +200,7 @@ public class PackagedProgram {
}
// now that we have an entry point, we can extract the nested jar files (if any)
- this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
+ this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
this.classpaths = classpaths;
this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
@@ -233,6 +242,7 @@ public class PackagedProgram {
// load the entry point class
this.mainClass = entryPointClass;
+ isPython = entryPointClass == PythonDriver.class;
// if the entry point is a program, instantiate the class and get the plan
if (Program.class.isAssignableFrom(this.mainClass)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java
new file mode 100644
index 0000000..e43a24e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import py4j.GatewayServer;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A main class used to launch Python applications. It executes python as a
+ * subprocess and then has it connect back to the JVM to access system properties, etc.
+ */
+public class PythonDriver {
+ private static final Logger LOG = LoggerFactory.getLogger(PythonDriver.class);
+
+ public static void main(String[] args) {
+ // the python job needs at least 2 args.
+ // e.g. py a.py ...
+ // e.g. pym a.b -pyfs a.zip ...
+ if (args.length < 2) {
+ LOG.error("Required at least two arguments, only python file or python module is available.");
+ System.exit(1);
+ }
+ // parse args
+ Map<String, List<String>> parsedArgs = parseOptions(args);
+ // start gateway server
+ GatewayServer gatewayServer = startGatewayServer();
+ // prepare python env
+
+ // map filename to its Path
+ Map<String, Path> filePathMap = new HashMap<>();
+ // commands which will be exec in python progress.
+ List<String> commands = constructPythonCommands(filePathMap, parsedArgs);
+ try {
+ // prepare the exec environment of python progress.
+ PythonUtil.PythonEnvironment pythonEnv = PythonUtil.preparePythonEnvironment(filePathMap);
+ // set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress.
+ pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
+ // start the python process.
+ Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands);
+ int exitCode = pythonProcess.waitFor();
+ if (exitCode != 0) {
+ throw new RuntimeException("Python process exits with code: " + exitCode);
+ }
+ } catch (Throwable e) {
+ LOG.error("Run python process failed", e);
+ } finally {
+ gatewayServer.shutdown();
+ }
+ }
+
+ /**
+ * Creates a GatewayServer run in a daemon thread.
+ *
+ * @return The created GatewayServer
+ */
+ public static GatewayServer startGatewayServer() {
+ InetAddress localhost = InetAddress.getLoopbackAddress();
+ GatewayServer gatewayServer = new GatewayServer.GatewayServerBuilder()
+ .javaPort(0)
+ .javaAddress(localhost)
+ .build();
+ Thread thread = new Thread(gatewayServer::start);
+ thread.setName("py4j-gateway");
+ thread.setDaemon(true);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ LOG.error("The gateway server thread join failed.", e);
+ System.exit(1);
+ }
+ return gatewayServer;
+ }
+
+ /**
+ * Constructs the Python commands which will be executed in python process.
+ *
+ * @param filePathMap stores python file name to its path
+ * @param parsedArgs parsed args
+ */
+ public static List<String> constructPythonCommands(Map<String, Path> filePathMap, Map<String, List<String>> parsedArgs) {
+ List<String> commands = new ArrayList<>();
+ if (parsedArgs.containsKey("py")) {
+ String pythonFile = parsedArgs.get("py").get(0);
+ Path pythonFilePath = new Path(pythonFile);
+ filePathMap.put(pythonFilePath.getName(), pythonFilePath);
+ commands.add(pythonFilePath.getName());
+ }
+ if (parsedArgs.containsKey("pym")) {
+ String pyModule = parsedArgs.get("pym").get(0);
+ commands.add("-m");
+ commands.add(pyModule);
+ }
+ if (parsedArgs.containsKey("pyfs")) {
+ List<String> pyFiles = parsedArgs.get("pyfs");
+ for (String pyFile : pyFiles) {
+ Path pyFilePath = new Path(pyFile);
+ filePathMap.put(pyFilePath.getName(), pyFilePath);
+ }
+ }
+ if (parsedArgs.containsKey("args")) {
+ commands.addAll(parsedArgs.get("args"));
+ }
+ return commands;
+ }
+
+ /**
+ * Parses the args to the map format.
+ *
+ * @param args ["py", "xxx.py",
+ * "pyfs", "a.py,b.py,c.py",
+ * "--input", "in.txt"]
+ * @return {"py"->List("xxx.py"),"pyfs"->List("a.py","b.py","c.py"),"args"->List("--input","in.txt")}
+ */
+ public static Map<String, List<String>> parseOptions(String[] args) {
+ Map<String, List<String>> parsedArgs = new HashMap<>();
+ int argIndex = 0;
+ boolean isEntrypointSpecified = false;
+ // valid args should include python or pyModule field and their value.
+ if (args[0].equals("py") || args[0].equals("pym")) {
+ parsedArgs.put(args[0], Collections.singletonList(args[1]));
+ argIndex = 2;
+ isEntrypointSpecified = true;
+ }
+ if (isEntrypointSpecified && args.length > 2 && args[2].equals("pyfs")) {
+ List<String> pyFilesList = new ArrayList<>(Arrays.asList(args[3].split(",")));
+ parsedArgs.put(args[2], pyFilesList);
+ argIndex = 4;
+ }
+ if (!isEntrypointSpecified) {
+ throw new RuntimeException("The Python entrypoint has not been specified. It can be specified with option -py or -pym");
+ }
+ // if arg include other args, the key "args" will map to other args.
+ if (args.length > argIndex) {
+ List<String> otherArgList = new ArrayList<>(args.length - argIndex);
+ otherArgList.addAll(Arrays.asList(args).subList(argIndex, args.length));
+ parsedArgs.put("args", otherArgList);
+ }
+ return parsedArgs;
+ }
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
index 6432a67..64f2ef1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
+import java.nio.file.Files;
/**
* The Py4j Gateway Server provides RPC service for user's python process.
@@ -56,15 +57,17 @@ public class PythonGatewayServer {
// Tells python side the port of our java rpc server
String handshakeFilePath = System.getenv("_PYFLINK_CONN_INFO_PATH");
File handshakeFile = new File(handshakeFilePath);
- if (handshakeFile.createNewFile()) {
- FileOutputStream fileOutputStream = new FileOutputStream(handshakeFile);
- DataOutputStream stream = new DataOutputStream(fileOutputStream);
- stream.writeInt(boundPort);
- stream.close();
- fileOutputStream.close();
- } else {
- System.out.println("Can't create handshake file: " + handshakeFilePath + ", now exit...");
- return;
+ File tmpPath = Files.createTempFile(handshakeFile.getParentFile().toPath(),
+ "connection", ".info").toFile();
+ FileOutputStream fileOutputStream = new FileOutputStream(tmpPath);
+ DataOutputStream stream = new DataOutputStream(fileOutputStream);
+ stream.writeInt(boundPort);
+ stream.close();
+ fileOutputStream.close();
+
+ if (!tmpPath.renameTo(handshakeFile)) {
+ System.out.println("Unable to write connection information to handshake file: " + handshakeFilePath + ", now exit...");
+ System.exit(1);
}
// Exit on EOF or broken pipe. This ensures that the server dies
diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java
new file mode 100644
index 0000000..b9012a3
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The util class help to prepare Python env and run the python process.
+ */
+public final class PythonUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(PythonUtil.class);
+
+ private static final String FLINK_OPT_DIR = System.getenv("FLINK_OPT_DIR");
+
+ private static final String FLINK_OPT_DIR_PYTHON = FLINK_OPT_DIR + File.separator + "python";
+
+ /**
+ * Wraps Python exec environment.
+ */
+ public static class PythonEnvironment {
+ public String workingDirectory;
+
+ public String pythonExec = "python";
+
+ public String pythonPath;
+
+ Map<String, String> systemEnv = new HashMap<>();
+ }
+
+ /**
+ * The hook thread that delete the tmp working dir of python process after the python process shutdown.
+ */
+ private static class ShutDownPythonHook extends Thread {
+ private Process p;
+ private String pyFileDir;
+
+ public ShutDownPythonHook(Process p, String pyFileDir) {
+ this.p = p;
+ this.pyFileDir = pyFileDir;
+ }
+
+ public void run() {
+
+ p.destroyForcibly();
+
+ if (pyFileDir != null) {
+ File pyDir = new File(pyFileDir);
+ FileUtils.deleteDirectoryQuietly(pyDir);
+ }
+ }
+ }
+
+
+ /**
+ * Prepares PythonEnvironment to start python process.
+ *
+ * @param filePathMap map file name to its file path.
+ * @return PythonEnvironment the Python environment which will be executed in Python process.
+ */
+ public static PythonEnvironment preparePythonEnvironment(Map<String, Path> filePathMap) {
+ PythonEnvironment env = new PythonEnvironment();
+
+ // 1. setup temporary local directory for the user files
+ String tmpDir = System.getProperty("java.io.tmpdir") +
+ File.separator + "pyflink" + UUID.randomUUID();
+
+ Path tmpDirPath = new Path(tmpDir);
+ try {
+ FileSystem fs = tmpDirPath.getFileSystem();
+ if (fs.exists(tmpDirPath)) {
+ fs.delete(tmpDirPath, true);
+ }
+ fs.mkdirs(tmpDirPath);
+ } catch (IOException e) {
+ LOG.error("Prepare tmp directory failed.", e);
+ }
+
+ env.workingDirectory = tmpDirPath.toString();
+
+ StringBuilder pythonPathEnv = new StringBuilder();
+
+ pythonPathEnv.append(env.workingDirectory);
+
+ // 2. create symbolLink in the working directory for the pyflink dependency libs.
+ List<java.nio.file.Path> pythonLibs = getLibFiles(FLINK_OPT_DIR_PYTHON);
+ for (java.nio.file.Path libPath : pythonLibs) {
+ java.nio.file.Path symbolicLinkFilePath = FileSystems.getDefault().getPath(env.workingDirectory,
+ libPath.getFileName().toString());
+ createSymbolicLinkForPyflinkLib(libPath, symbolicLinkFilePath);
+ pythonPathEnv.append(File.pathSeparator);
+ pythonPathEnv.append(symbolicLinkFilePath.toString());
+ }
+
+ // 3. copy relevant python files to tmp dir and set them in PYTHONPATH.
+ filePathMap.forEach((sourceFileName, sourcePath) -> {
+ Path targetPath = new Path(tmpDirPath, sourceFileName);
+ try {
+ FileUtils.copy(sourcePath, targetPath, true);
+ } catch (IOException e) {
+ LOG.error("Copy files to tmp dir failed", e);
+ }
+ String targetFileName = targetPath.toString();
+ pythonPathEnv.append(File.pathSeparator);
+ pythonPathEnv.append(targetFileName);
+
+ });
+
+ env.pythonPath = pythonPathEnv.toString();
+ return env;
+ }
+
+ /**
+ * Gets pyflink dependent libs in specified directory.
+ *
+ * @param libDir The lib directory
+ */
+ public static List<java.nio.file.Path> getLibFiles(String libDir) {
+ final List<java.nio.file.Path> libFiles = new ArrayList<>();
+ SimpleFileVisitor<java.nio.file.Path> finder = new SimpleFileVisitor<java.nio.file.Path>() {
+ @Override
+ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
+ // exclude .txt file
+ if (!file.toString().endsWith(".txt")) {
+ libFiles.add(file);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ };
+ try {
+ Files.walkFileTree(FileSystems.getDefault().getPath(libDir), finder);
+ } catch (IOException e) {
+ LOG.error("Gets pyflink dependent libs failed.", e);
+ }
+ return libFiles;
+ }
+
+ /**
+ * Creates symbolLink in working directory for pyflink lib.
+ *
+ * @param libPath the pyflink lib file path.
+ * @param symbolicLinkPath the symbolic link to pyflink lib.
+ */
+ public static void createSymbolicLinkForPyflinkLib(java.nio.file.Path libPath, java.nio.file.Path symbolicLinkPath) {
+ try {
+ Files.createSymbolicLink(symbolicLinkPath, libPath);
+ } catch (IOException e) {
+ LOG.error("Create symbol link for pyflink lib failed.", e);
+ LOG.info("Try to copy pyflink lib to working directory");
+ try {
+ Files.copy(libPath, symbolicLinkPath);
+ } catch (IOException ex) {
+ LOG.error("Copy pylink lib to working directory failed", ex);
+ }
+ }
+ }
+
+ /**
+ * Starts python process.
+ *
+ * @param pythonEnv the python Environment which will be in a process.
+ * @param commands the commands that python process will execute.
+ * @return the process represent the python process.
+ * @throws IOException Thrown if an error occurred when python process start.
+ */
+ public static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> commands) throws IOException {
+ ProcessBuilder pythonProcessBuilder = new ProcessBuilder();
+ Map<String, String> env = pythonProcessBuilder.environment();
+ env.put("PYTHONPATH", pythonEnv.pythonPath);
+ pythonEnv.systemEnv.forEach(env::put);
+ commands.add(0, pythonEnv.pythonExec);
+ pythonProcessBuilder.command(commands);
+ // set the working directory.
+ pythonProcessBuilder.directory(new File(pythonEnv.workingDirectory));
+ // redirect the stderr to stdout
+ pythonProcessBuilder.redirectErrorStream(true);
+ // set the child process the output same as the parent process.
+ pythonProcessBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
+ Process process = pythonProcessBuilder.start();
+ if (!process.isAlive()) {
+ throw new RuntimeException("Failed to start Python process. ");
+ }
+
+ // Make sure that the python sub process will be killed when JVM exit
+ ShutDownPythonHook hook = new ShutDownPythonHook(process, pythonEnv.workingDirectory);
+ Runtime.getRuntime().addShutdownHook(hook);
+
+ return process;
+ }
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
new file mode 100644
index 0000000..0b6f570
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Test;
+import py4j.GatewayServer;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link PythonDriver}.
+ */
+public class PythonDriverTest {
+ @Test
+ public void testStartGatewayServer() {
+ GatewayServer gatewayServer = PythonDriver.startGatewayServer();
+ try {
+ Socket socket = new Socket("localhost", gatewayServer.getListeningPort());
+ assert socket.isConnected();
+ } catch (IOException e) {
+ throw new RuntimeException("Connect Gateway Server failed");
+ } finally {
+ gatewayServer.shutdown();
+ }
+ }
+
+ @Test
+ public void testConstructCommands() {
+ Map<String, Path> filePathMap = new HashMap<>();
+ Map<String, List<String>> parseArgs = new HashMap<>();
+ parseArgs.put("py", Collections.singletonList("xxx.py"));
+ List<String> pyFilesList = new ArrayList<>();
+ pyFilesList.add("a.py");
+ pyFilesList.add("b.py");
+ pyFilesList.add("c.py");
+ parseArgs.put("pyfs", pyFilesList);
+ List<String> otherArgs = new ArrayList<>();
+ otherArgs.add("--input");
+ otherArgs.add("in.txt");
+ parseArgs.put("args", otherArgs);
+ List<String> commands = PythonDriver.constructPythonCommands(filePathMap, parseArgs);
+ Path pythonPath = filePathMap.get("xxx.py");
+ Assert.assertNotNull(pythonPath);
+ Assert.assertEquals(pythonPath.getName(), "xxx.py");
+ Path aPyFilePath = filePathMap.get("a.py");
+ Assert.assertNotNull(aPyFilePath);
+ Assert.assertEquals(aPyFilePath.getName(), "a.py");
+ Path bPyFilePath = filePathMap.get("b.py");
+ Assert.assertNotNull(bPyFilePath);
+ Assert.assertEquals(bPyFilePath.getName(), "b.py");
+ Path cPyFilePath = filePathMap.get("c.py");
+ Assert.assertNotNull(cPyFilePath);
+ Assert.assertEquals(cPyFilePath.getName(), "c.py");
+ Assert.assertEquals(3, commands.size());
+ Assert.assertEquals(commands.get(0), "xxx.py");
+ Assert.assertEquals(commands.get(1), "--input");
+ Assert.assertEquals(commands.get(2), "in.txt");
+ }
+
+ @Test
+ public void testParseOptions() {
+ String[] args = {"py", "xxx.py", "pyfs", "a.py,b.py,c.py", "--input", "in.txt"};
+ Map<String, List<String>> parsedArgs = PythonDriver.parseOptions(args);
+ List<String> pythonMainFile = parsedArgs.get("py");
+ Assert.assertNotNull(pythonMainFile);
+ Assert.assertEquals(1, pythonMainFile.size());
+ Assert.assertEquals(pythonMainFile.get(0), args[1]);
+ List<String> pyFilesList = parsedArgs.get("pyfs");
+ Assert.assertEquals(3, pyFilesList.size());
+ String[] pyFiles = args[3].split(",");
+ for (int i = 0; i < pyFiles.length; i++) {
+ assert pyFilesList.get(i).equals(pyFiles[i]);
+ }
+ List<String> otherArgs = parsedArgs.get("args");
+ for (int i = 4; i < args.length; i++) {
+ Assert.assertEquals(otherArgs.get(i - 4), args[i]);
+ }
+ }
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java b/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java
new file mode 100644
index 0000000..4b14ced
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests for the {@link PythonUtil}.
+ */
+public class PythonUtilTest {
+ private Path sourceTmpDirPath;
+ private Path targetTmpDirPath;
+ private FileSystem sourceFs;
+ private FileSystem targetFs;
+
+ @Before
+ public void prepareTestEnvironment() {
+ String sourceTmpDir = System.getProperty("java.io.tmpdir") +
+ File.separator + "source_" + UUID.randomUUID();
+ String targetTmpDir = System.getProperty("java.io.tmpdir") +
+ File.separator + "target_" + UUID.randomUUID();
+
+ sourceTmpDirPath = new Path(sourceTmpDir);
+ targetTmpDirPath = new Path(targetTmpDir);
+ try {
+ sourceFs = sourceTmpDirPath.getFileSystem();
+ if (sourceFs.exists(sourceTmpDirPath)) {
+ sourceFs.delete(sourceTmpDirPath, true);
+ }
+ sourceFs.mkdirs(sourceTmpDirPath);
+ targetFs = targetTmpDirPath.getFileSystem();
+ if (targetFs.exists(targetTmpDirPath)) {
+ targetFs.delete(targetTmpDirPath, true);
+ }
+ targetFs.mkdirs(targetTmpDirPath);
+ } catch (IOException e) {
+ throw new RuntimeException("initial PythonUtil test environment failed");
+ }
+ }
+
+ @Test
+ public void testStartPythonProcess() {
+ PythonUtil.PythonEnvironment pythonEnv = new PythonUtil.PythonEnvironment();
+ pythonEnv.workingDirectory = targetTmpDirPath.toString();
+ pythonEnv.pythonPath = targetTmpDirPath.toString();
+ List<String> commands = new ArrayList<>();
+ Path pyPath = new Path(targetTmpDirPath, "word_count.py");
+ try {
+ targetFs.create(pyPath, FileSystem.WriteMode.OVERWRITE);
+ File pyFile = new File(pyPath.toString());
+ String pyProgram = "#!/usr/bin/python\n" +
+ "# -*- coding: UTF-8 -*-\n" +
+ "import sys\n" +
+ "\n" +
+ "if __name__=='__main__':\n" +
+ "\tfilename = sys.argv[1]\n" +
+ "\tfo = open(filename, \"w\")\n" +
+ "\tfo.write( \"hello world\")\n" +
+ "\tfo.close()";
+ Files.write(pyFile.toPath(), pyProgram.getBytes(), StandardOpenOption.WRITE);
+ Path result = new Path(targetTmpDirPath, "word_count_result.txt");
+ commands.add(pyFile.getName());
+ commands.add(result.getName());
+ Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands);
+ int exitCode = pythonProcess.waitFor();
+ if (exitCode != 0) {
+ throw new RuntimeException("Python process exits with code: " + exitCode);
+ }
+ String cmdResult = new String(Files.readAllBytes(new File(result.toString()).toPath()));
+ Assert.assertEquals(cmdResult, "hello world");
+ pythonProcess.destroyForcibly();
+ targetFs.delete(pyPath, true);
+ targetFs.delete(result, true);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException("test start Python process failed " + e.getMessage());
+ }
+ }
+
+ @After
+ public void cleanEnvironment() {
+ try {
+ sourceFs.delete(sourceTmpDirPath, true);
+ targetFs.delete(targetTmpDirPath, true);
+ } catch (IOException e) {
+ throw new RuntimeException("delete tmp dir failed " + e.getMessage());
+ }
+ }
+}
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 52a6466..1350f10 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -572,6 +572,9 @@ under the License.
<relocation>
<pattern>py4j</pattern>
<shadedPattern>org.apache.flink.api.python.py4j</shadedPattern>
+ <includes>
+ <include>py4j.*</include>
+ </includes>
</relocation>
</relocations>
</configuration>
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 3eb5698..788ec1b 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -242,6 +242,13 @@ under the License.
<fileMode>0755</fileMode>
</fileSet>
+ <!-- copy python table example to examples of dist -->
+ <fileSet>
+ <directory>../flink-python/pyflink/table/examples</directory>
+ <outputDirectory>examples/python/table</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+
</fileSets>
</assembly>
diff --git a/flink-python/pyflink/find_flink_home.py b/flink-python/pyflink/find_flink_home.py
index 0491368..9806490 100644
--- a/flink-python/pyflink/find_flink_home.py
+++ b/flink-python/pyflink/find_flink_home.py
@@ -28,6 +28,9 @@ def _find_flink_home():
# If the environment has set FLINK_HOME, trust it.
if 'FLINK_HOME' in os.environ:
return os.environ['FLINK_HOME']
+ elif 'FLINK_ROOT_DIR' in os.environ:
+ os.environ['FLINK_HOME'] = os.environ['FLINK_ROOT_DIR']
+ return os.environ['FLINK_ROOT_DIR']
else:
try:
flink_root_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index e5c8330..b218d23 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -28,7 +28,6 @@ from threading import RLock
from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
from pyflink.find_flink_home import _find_flink_home
-
_gateway = None
_lock = RLock()
@@ -46,6 +45,9 @@ def get_gateway():
_gateway = JavaGateway(gateway_parameters=gateway_param)
else:
_gateway = launch_gateway()
+
+ # import the flink view
+ import_flink_view(_gateway)
return _gateway
@@ -97,6 +99,14 @@ def launch_gateway():
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True))
+ return gateway
+
+
+def import_flink_view(gateway):
+ """
+ import the classes used by PyFlink.
+ :param gateway:gateway connected to JavaGateWayServer
+ """
# Import the classes used by PyFlink
java_import(gateway.jvm, "org.apache.flink.table.api.*")
java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
@@ -109,5 +119,3 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment")
java_import(gateway.jvm,
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
-
- return gateway
diff --git a/flink-python/pyflink/table/examples/batch/__init__.py b/flink-python/pyflink/table/examples/batch/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-python/pyflink/table/examples/batch/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
new file mode 100644
index 0000000..a324af4
--- /dev/null
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -0,0 +1,79 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import tempfile
+
+from pyflink.table import TableEnvironment, TableConfig
+from pyflink.table.table_sink import CsvTableSink
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+
+
+# TODO: the word_count.py is just a test example for CLI.
+# After pyflink have aligned Java Table API Connectors, this example will be improved.
+def word_count():
+ tmp_dir = tempfile.gettempdir()
+ source_path = tmp_dir + '/streaming.csv'
+ if os.path.isfile(source_path):
+ os.remove(source_path)
+ content = "line Licensed to the Apache Software Foundation ASF under one " \
+ "line or more contributor license agreements See the NOTICE file " \
+ "line distributed with this work for additional information " \
+ "line regarding copyright ownership The ASF licenses this file " \
+ "to you under the Apache License Version the " \
+ "License you may not use this file except in compliance " \
+ "with the License"
+
+ with open(source_path, 'w') as f:
+ for word in content.split(" "):
+ f.write(",".join([word, "1"]))
+ f.write("\n")
+ f.flush()
+ f.close()
+
+ t_config = TableConfig.Builder().as_batch_execution().set_parallelism(1).build()
+ t_env = TableEnvironment.create(t_config)
+
+ field_names = ["word", "cout"]
+ field_types = [DataTypes.STRING, DataTypes.LONG]
+
+ # register Orders table in table environment
+ t_env.register_table_source(
+ "Word",
+ CsvTableSource(source_path, field_names, field_types))
+
+ # register Results table in table environment
+ tmp_dir = tempfile.gettempdir()
+ tmp_csv = tmp_dir + '/streaming2.csv'
+ if os.path.isfile(tmp_csv):
+ os.remove(tmp_csv)
+
+ t_env.register_table_sink(
+ "Results",
+ field_names, field_types, CsvTableSink(tmp_csv))
+
+ t_env.scan("Word") \
+ .group_by("word") \
+ .select("word, count(1) as count") \
+ .insert_into("Results")
+
+ t_env.execute()
+
+
+if __name__ == '__main__':
+ word_count()