You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/28 07:35:07 UTC

[flink] 01/02: [FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend This closes #8472

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec61cf21276fb6eea599b40ea999613ffeef48ef
Author: Huang Xingbo <hx...@gmail.com>
AuthorDate: Fri May 17 11:17:23 2019 +0800

    [FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend
    This closes #8472
---
 docs/ops/cli.md                                    |  66 ++++++
 docs/ops/cli.zh.md                                 |  63 +++++-
 .../org/apache/flink/client/cli/CliFrontend.java   |  56 ++++--
 .../apache/flink/client/cli/CliFrontendParser.java |  29 +++
 .../apache/flink/client/cli/ProgramOptions.java    |  74 ++++++-
 .../flink/client/program/PackagedProgram.java      |  32 ++-
 .../apache/flink/client/python/PythonDriver.java   | 168 ++++++++++++++++
 .../flink/client/python/PythonGatewayServer.java   |  21 +-
 .../org/apache/flink/client/python/PythonUtil.java | 223 +++++++++++++++++++++
 .../flink/client/python/PythonDriverTest.java      | 104 ++++++++++
 .../apache/flink/client/python/PythonUtilTest.java | 118 +++++++++++
 flink-dist/pom.xml                                 |   3 +
 flink-dist/src/main/assemblies/bin.xml             |   7 +
 flink-python/pyflink/find_flink_home.py            |   3 +
 flink-python/pyflink/java_gateway.py               |  14 +-
 .../pyflink/table/examples/batch/__init__.py       |  17 ++
 .../pyflink/table/examples/batch/word_count.py     |  79 ++++++++
 17 files changed, 1032 insertions(+), 45 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index b414b30..505207d 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -47,6 +47,12 @@ available.
 {:toc}
 
 ## Examples
+### Job Submission Examples
+-----------------------------
+
+These examples about how to submit a job in CLI.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
 -   Run example program with no arguments:
 
@@ -88,6 +94,53 @@ available.
                                ./examples/batch/WordCount.jar \
                                --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
 
+</div>
+
+<div data-lang="python" markdown="1">
+
+-   Run Python Table program:
+
+        ./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   Run Python Table program with pyFiles:
+
+        ./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar> \
+                                -pyfs file:///user.txt,hdfs:///$namenode_address/username.txt
+
+-   Run Python Table program with pyFiles and pyModule:
+
+        ./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j <path/to/flink-table.jar>
+
+-   Run Python Table program with parallelism 16:
+
+        ./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   Run Python Table program with flink log output disabled:
+
+        ./bin/flink run -q -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   Run Python Table program in detached mode:
+
+        ./bin/flink run -d examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   Run Python Table program on a specific JobManager:
+
+        ./bin/flink run -m myJMHost:8081 \
+                               -py examples/python/table/batch/word_count.py \
+                               -j <path/to/flink-table.jar>
+
+-   Run Python Table program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
+
+        ./bin/flink run -m yarn-cluster -yn 2 \
+                               -py examples/python/table/batch/word_count.py \
+                               -j <path/to/flink-table.jar>
+</div>
+
+### Job Management Examples
+-----------------------------
+
+These examples about how to manage a job in CLI.
+
 -   Display the optimized execution plan for the WordCount example program as JSON:
 
         ./bin/flink info ./examples/batch/WordCount.jar \
@@ -251,6 +304,19 @@ Action "run" compiles and runs a program.
                                           program. Optional flag to override the
                                           default value specified in the
                                           configuration.
+     -py,--python <python-file>           Python script with the program entry 
+                                          point.The dependent resources can be 
+                                          configured with the `--pyFiles` option.
+     -pyfs,--pyFiles <python-files>       Attach custom python files for job. 
+                                          Comma can be used as the separator to 
+                                          specify multiple files. The standard 
+                                          python resource file suffixes such as 
+                                          .py/.egg/.zip are all supported.
+                                          (eg:--pyFiles file:///tmp/myresource.zip
+                                          ,hdfs:///$namenode_address/myresource2.zip)
+     -pym,--pyModule <python-module>      Python module with the program entry 
+                                          point. This option must be used in 
+                                          conjunction with ` --pyFiles`.                                                                                                                
      -q,--sysoutLogging                   If present, suppress logging output to
                                           standard out.
      -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index 7c02047..93f16fb 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -47,6 +47,12 @@ available.
 {:toc}
 
 ## Examples
+### 作业提交示例
+-----------------------------
+
+这些示例是关于如何通过脚本提交一个作业
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
 -   Run example program with no arguments:
 
@@ -82,11 +88,57 @@ available.
                                ./examples/batch/WordCount.jar \
                                --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
 
--   Run example program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
+-   Run example program using a [per-job YARN cluster]({{site.baseurl}}/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
 
         ./bin/flink run -m yarn-cluster -yn 2 \
                                ./examples/batch/WordCount.jar \
                                --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
+                               
+</div>
+
+<div data-lang="python" markdown="1">
+
+-   提交一个Python Table的作业:
+
+        ./bin/flink run -py WordCount.py -j <path/to/flink-table.jar>
+
+-   提交一个有多个依赖的Python Table的作业:
+
+        ./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar> \
+                                -pyfs file:///user.txt,hdfs:///$namenode_address/username.txt
+
+-   提交一个有多个依赖的Python Table的作业,Python作业的主入口通过pym选项指定:
+
+        ./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j <path/to/flink-table.jar>
+
+-   提交一个指定并发度为16的Python Table的作业:
+
+        ./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   提交一个关闭flink日志输出的Python Table的作业:
+
+        ./bin/flink run -q -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   提交一个运行在detached模式下的Python Table的作业:
+
+        ./bin/flink run -d examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
+
+-   提交一个运行在指定JobManager上的Python Table的作业:
+
+        ./bin/flink run -m myJMHost:8081 \
+                            -py examples/python/table/batch/word_count.py \
+                            -j <path/to/flink-table.jar>
+
+-   提交一个运行在有两个TaskManager的[per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn)的Python Table的作业:
+
+        ./bin/flink run -m yarn-cluster -yn 2 \
+                                 -py examples/python/table/batch/word_count.py \
+                                 -j <path/to/flink-table.jar>
+                                 
+</div>
+
+### 作业管理示例
+-----------------------------
 
 -   Display the optimized execution plan for the WordCount example program as JSON:
 
@@ -251,6 +303,15 @@ Action "run" compiles and runs a program.
                                           program. Optional flag to override the
                                           default value specified in the
                                           configuration.
+     -py,--python <python-file>           指定Python作业的入口,依赖的资源文件可以通过
+                                          `--pyFiles`进行指定。
+     -pyfs,--pyFiles <python-files>       指定Python作业依赖的一些自定义的python文件,
+                                          如果有多个文件,可以通过逗号(,)进行分隔。支持
+                                          常用的python资源文件,例如(.py/.egg/.zip)。
+                                          (例如:--pyFiles file:///tmp/myresource.zip
+                                          ,hdfs:///$namenode_address/myresource2.zip)
+     -pym,--pyModule <python-module>      指定python程序的运行的模块入口,这个选项必须配合
+                                          `--pyFiles`一起使用。
      -q,--sysoutLogging                   If present, suppress logging output to
                                           standard out.
      -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c6b5c9a..c591e6e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -32,6 +32,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.client.python.PythonDriver;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -185,8 +186,11 @@ public class CliFrontend {
 			return;
 		}
 
-		if (runOptions.getJarFilePath() == null) {
-			throw new CliArgsException("The program JAR file was not specified.");
+		if (!runOptions.isPython()) {
+			// Java program should be specified a JAR file
+			if (runOptions.getJarFilePath() == null) {
+				throw new CliArgsException("Java program should be specified a JAR file.");
+			}
 		}
 
 		final PackagedProgram program;
@@ -771,12 +775,42 @@ public class CliFrontend {
 		String jarFilePath = options.getJarFilePath();
 		List<URL> classpaths = options.getClasspaths();
 
-		if (jarFilePath == null) {
-			throw new IllegalArgumentException("The program JAR file was not specified.");
+		String entryPointClass;
+		File jarFile = null;
+		if (options.isPython()) {
+			// If the job is specified a jar file
+			if (jarFilePath != null) {
+				jarFile = getJarFile(jarFilePath);
+			}
+			// The entry point class of python job is PythonDriver
+			entryPointClass = PythonDriver.class.getCanonicalName();
+		} else {
+			if (jarFilePath == null) {
+				throw new IllegalArgumentException("The program JAR file was not specified.");
+			}
+			jarFile = getJarFile(jarFilePath);
+			// Get assembler class
+			entryPointClass = options.getEntryPointClassName();
 		}
 
-		File jarFile = new File(jarFilePath);
+		PackagedProgram program = entryPointClass == null ?
+				new PackagedProgram(jarFile, classpaths, programArgs) :
+				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
+
+		program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
+
+		return program;
+	}
 
+	/**
+	 * Gets the JAR file from the path.
+	 *
+	 * @param jarFilePath The path of JAR file
+	 * @return The JAR file
+	 * @throws FileNotFoundException The JAR file does not exist.
+	 */
+	private File getJarFile(String jarFilePath) throws FileNotFoundException {
+		File jarFile = new File(jarFilePath);
 		// Check if JAR file exists
 		if (!jarFile.exists()) {
 			throw new FileNotFoundException("JAR file does not exist: " + jarFile);
@@ -784,17 +818,7 @@ public class CliFrontend {
 		else if (!jarFile.isFile()) {
 			throw new FileNotFoundException("JAR file is not a file: " + jarFile);
 		}
-
-		// Get assembler class
-		String entryPointClass = options.getEntryPointClassName();
-
-		PackagedProgram program = entryPointClass == null ?
-				new PackagedProgram(jarFile, classpaths, programArgs) :
-				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
-
-		program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
-
-		return program;
+		return jarFile;
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index cea3998..5872a54 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -118,6 +118,20 @@ public class CliFrontendParser {
 	public static final Option STOP_AND_DRAIN = new Option("d", "drain", false,
 			"Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");
 
+	static final Option PY_OPTION = new Option("py", "python", true,
+		"Python script with the program entry point. " +
+			"The dependent resources can be configured with the `--pyFiles` option.");
+
+	static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
+		"Attach custom python files for job. " +
+			"Comma can be used as the separator to specify multiple files. " +
+			"The standard python resource file suffixes such as .py/.egg/.zip are all supported." +
+			"(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
+
+	static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true,
+		"Python module with the program entry point. " +
+			"This option must be used in conjunction with `--pyFiles`.");
+
 	static {
 		HELP_OPTION.setRequired(false);
 
@@ -165,6 +179,15 @@ public class CliFrontendParser {
 		STOP_WITH_SAVEPOINT.setOptionalArg(true);
 
 		STOP_AND_DRAIN.setRequired(false);
+
+		PY_OPTION.setRequired(false);
+		PY_OPTION.setArgName("python");
+
+		PYFILES_OPTION.setRequired(false);
+		PYFILES_OPTION.setArgName("pyFiles");
+
+		PYMODULE_OPTION.setRequired(false);
+		PYMODULE_OPTION.setArgName("pyModule");
 	}
 
 	private static final Options RUN_OPTIONS = getRunCommandOptions();
@@ -186,6 +209,9 @@ public class CliFrontendParser {
 		options.addOption(DETACHED_OPTION);
 		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		options.addOption(YARN_DETACHED_OPTION);
+		options.addOption(PY_OPTION);
+		options.addOption(PYFILES_OPTION);
+		options.addOption(PYMODULE_OPTION);
 		return options;
 	}
 
@@ -196,6 +222,9 @@ public class CliFrontendParser {
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
 		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
+		options.addOption(PY_OPTION);
+		options.addOption(PYFILES_OPTION);
+		options.addOption(PYMODULE_OPTION);
 		return options;
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index da03d64..30b3867 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -36,6 +36,9 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
@@ -62,17 +65,71 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final SavepointRestoreSettings savepointSettings;
 
+	/**
+	 * Flag indicating whether the job is a Python job.
+	 */
+	private final boolean isPython;
+
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
 
 		String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
-				line.getOptionValues(ARGS_OPTION.getOpt()) :
-				line.getArgs();
+			line.getOptionValues(ARGS_OPTION.getOpt()) :
+			line.getArgs();
+
+		isPython = line.hasOption(PY_OPTION.getOpt()) | line.hasOption(PYMODULE_OPTION.getOpt());
+		// If specified the option -py(--python)
+		if (line.hasOption(PY_OPTION.getOpt())) {
+			// Cannot use option -py and -pym simultaneously.
+			if (line.hasOption(PYMODULE_OPTION.getOpt())) {
+				throw new CliArgsException("Cannot use option -py and -pym simultaneously.");
+			}
+			// The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
+			// CLI cmd : -py ${python.py} pyfs [optional] ${py-files} [optional] ${other args}.
+			// PythonDriver args: py ${python.py} [optional] pyfs [optional] ${py-files} [optional] ${other args}.
+			// -------------------------------transformed-------------------------------------------------------
+			// e.g. -py wordcount.py(CLI cmd) -----------> py wordcount.py(PythonDriver args)
+			// e.g. -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd)
+			// 	-----> py wordcount.py pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args)
+			String[] newArgs;
+			int argIndex;
+			if (line.hasOption(PYFILES_OPTION.getOpt())) {
+				newArgs = new String[args.length + 4];
+				newArgs[2] = PYFILES_OPTION.getOpt();
+				newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
+				argIndex = 4;
+			} else {
+				newArgs = new String[args.length + 2];
+				argIndex = 2;
+			}
+			newArgs[0] = PY_OPTION.getOpt();
+			newArgs[1] = line.getOptionValue(PY_OPTION.getOpt());
+			System.arraycopy(args, 0, newArgs, argIndex, args.length);
+			args = newArgs;
+		}
+
+		// If specified the option -pym(--pyModule)
+		if (line.hasOption(PYMODULE_OPTION.getOpt())) {
+			// If you specify the option -pym, you should specify the option --pyFiles simultaneously.
+			if (!line.hasOption(PYFILES_OPTION.getOpt())) {
+				throw new CliArgsException("-pym must be used in conjunction with `--pyFiles`");
+			}
+			// The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
+			// CLI cmd : -pym ${py-module} -pyfs ${py-files} [optional] ${other args}.
+			// PythonDriver args: pym ${py-module} pyfs ${py-files} [optional] ${other args}.
+			// e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> pym AAA.fun -pyfs AAA.zip(PythonDriver args)
+			String[] newArgs = new String[args.length + 4];
+			newArgs[0] = PYMODULE_OPTION.getOpt();
+			newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt());
+			newArgs[2] = PYFILES_OPTION.getOpt();
+			newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
+			System.arraycopy(args, 0, newArgs, 4, args.length);
+			args = newArgs;
+		}
 
 		if (line.hasOption(JAR_OPTION.getOpt())) {
 			this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
-		}
-		else if (args.length > 0) {
+		} else if (!isPython && args.length > 0) {
 			jarFilePath = args[0];
 			args = Arrays.copyOfRange(args, 1, args.length);
 		}
@@ -95,7 +152,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		this.classpaths = classpaths;
 
 		this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
-				line.getOptionValue(CLASS_OPTION.getOpt()) : null;
+			line.getOptionValue(CLASS_OPTION.getOpt()) : null;
 
 		if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
 			String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
@@ -156,4 +213,11 @@ public abstract class ProgramOptions extends CommandLineOptions {
 	public SavepointRestoreSettings getSavepointRestoreSettings() {
 		return savepointSettings;
 	}
+
+	/**
+	 * Indicates whether the job is a Python job.
+	 */
+	public boolean isPython() {
+		return isPython;
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 8f5ccba..77b5d29 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.client.python.PythonDriver;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -91,6 +92,11 @@ public class PackagedProgram {
 	private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
 
 	/**
+	 * Flag indicating whether the job is a Python job.
+	 */
+	private final boolean isPython;
+
+	/**
 	 * Creates an instance that wraps the plan defined in the jar file using the given
 	 * argument.
 	 *
@@ -169,18 +175,21 @@ public class PackagedProgram {
 	 *         may be a missing / wrong class or manifest files.
 	 */
 	public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
-		if (jarFile == null) {
-			throw new IllegalArgumentException("The jar file must not be null.");
-		}
+		// Whether the job is a Python job.
+		isPython = entryPointClassName != null && entryPointClassName.equals(PythonDriver.class.getCanonicalName());
 
-		URL jarFileUrl;
-		try {
-			jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
-		} catch (MalformedURLException e1) {
-			throw new IllegalArgumentException("The jar file path is invalid.");
-		}
+		URL jarFileUrl = null;
+		if (jarFile != null) {
+			try {
+				jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
+			} catch (MalformedURLException e1) {
+				throw new IllegalArgumentException("The jar file path is invalid.");
+			}
 
-		checkJarFile(jarFileUrl);
+			checkJarFile(jarFileUrl);
+		} else if (!isPython) {
+			throw new IllegalArgumentException("The jar file must not be null.");
+		}
 
 		this.jarFile = jarFileUrl;
 		this.args = args == null ? new String[0] : args;
@@ -191,7 +200,7 @@ public class PackagedProgram {
 		}
 
 		// now that we have an entry point, we can extract the nested jar files (if any)
-		this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
+		this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
 		this.classpaths = classpaths;
 		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
 
@@ -233,6 +242,7 @@ public class PackagedProgram {
 
 		// load the entry point class
 		this.mainClass = entryPointClass;
+		isPython = entryPointClass == PythonDriver.class;
 
 		// if the entry point is a program, instantiate the class and get the plan
 		if (Program.class.isAssignableFrom(this.mainClass)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java
new file mode 100644
index 0000000..e43a24e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import py4j.GatewayServer;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A main class used to launch Python applications. It executes python as a
+ * subprocess and then has it connect back to the JVM to access system properties, etc.
+ */
+public class PythonDriver {
+	private static final Logger LOG = LoggerFactory.getLogger(PythonDriver.class);
+
+	public static void main(String[] args) {
+		// the python job needs at least 2 args.
+		// e.g. py a.py ...
+		// e.g. pym a.b -pyfs a.zip ...
+		if (args.length < 2) {
+			LOG.error("Required at least two arguments, only python file or python module is available.");
+			System.exit(1);
+		}
+		// parse args
+		Map<String, List<String>> parsedArgs = parseOptions(args);
+		// start gateway server
+		GatewayServer gatewayServer = startGatewayServer();
+		// prepare python env
+
+		// map filename to its Path
+		Map<String, Path> filePathMap = new HashMap<>();
+		// commands which will be exec in python progress.
+		List<String> commands = constructPythonCommands(filePathMap, parsedArgs);
+		try {
+			// prepare the exec environment of python progress.
+			PythonUtil.PythonEnvironment pythonEnv = PythonUtil.preparePythonEnvironment(filePathMap);
+			// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress.
+			pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
+			// start the python process.
+			Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands);
+			int exitCode = pythonProcess.waitFor();
+			if (exitCode != 0) {
+				throw new RuntimeException("Python process exits with code: " + exitCode);
+			}
+		} catch (Throwable e) {
+			LOG.error("Run python process failed", e);
+		} finally {
+			gatewayServer.shutdown();
+		}
+	}
+
+	/**
+	 * Creates a GatewayServer run in a daemon thread.
+	 *
+	 * @return The created GatewayServer
+	 */
+	public static GatewayServer startGatewayServer() {
+		InetAddress localhost = InetAddress.getLoopbackAddress();
+		GatewayServer gatewayServer = new GatewayServer.GatewayServerBuilder()
+			.javaPort(0)
+			.javaAddress(localhost)
+			.build();
+		Thread thread = new Thread(gatewayServer::start);
+		thread.setName("py4j-gateway");
+		thread.setDaemon(true);
+		thread.start();
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			LOG.error("The gateway server thread join failed.", e);
+			System.exit(1);
+		}
+		return gatewayServer;
+	}
+
+	/**
+	 * Constructs the Python commands which will be executed in python process.
+	 *
+	 * @param filePathMap stores python file name to its path
+	 * @param parsedArgs  parsed args
+	 */
+	public static List<String> constructPythonCommands(Map<String, Path> filePathMap, Map<String, List<String>> parsedArgs) {
+		List<String> commands = new ArrayList<>();
+		if (parsedArgs.containsKey("py")) {
+			String pythonFile = parsedArgs.get("py").get(0);
+			Path pythonFilePath = new Path(pythonFile);
+			filePathMap.put(pythonFilePath.getName(), pythonFilePath);
+			commands.add(pythonFilePath.getName());
+		}
+		if (parsedArgs.containsKey("pym")) {
+			String pyModule = parsedArgs.get("pym").get(0);
+			commands.add("-m");
+			commands.add(pyModule);
+		}
+		if (parsedArgs.containsKey("pyfs")) {
+			List<String> pyFiles = parsedArgs.get("pyfs");
+			for (String pyFile : pyFiles) {
+				Path pyFilePath = new Path(pyFile);
+				filePathMap.put(pyFilePath.getName(), pyFilePath);
+			}
+		}
+		if (parsedArgs.containsKey("args")) {
+			commands.addAll(parsedArgs.get("args"));
+		}
+		return commands;
+	}
+
+	/**
+	 * Parses the args to the map format.
+	 *
+	 * @param args ["py", "xxx.py",
+	 *             "pyfs", "a.py,b.py,c.py",
+	 *             "--input", "in.txt"]
+	 * @return {"py"->List("xxx.py"),"pyfs"->List("a.py","b.py","c.py"),"args"->List("--input","in.txt")}
+	 */
+	public static Map<String, List<String>> parseOptions(String[] args) {
+		Map<String, List<String>> parsedArgs = new HashMap<>();
+		int argIndex = 0;
+		boolean isEntrypointSpecified = false;
+		// valid args should include python or pyModule field and their value.
+		if (args[0].equals("py") || args[0].equals("pym")) {
+			parsedArgs.put(args[0], Collections.singletonList(args[1]));
+			argIndex = 2;
+			isEntrypointSpecified = true;
+		}
+		if (isEntrypointSpecified && args.length > 2 && args[2].equals("pyfs")) {
+			List<String> pyFilesList = new ArrayList<>(Arrays.asList(args[3].split(",")));
+			parsedArgs.put(args[2], pyFilesList);
+			argIndex = 4;
+		}
+		if (!isEntrypointSpecified) {
+			throw new RuntimeException("The Python entrypoint has not been specified. It can be specified with option -py or -pym");
+		}
+		// if arg include other args, the key "args" will map to other args.
+		if (args.length > argIndex) {
+			List<String> otherArgList = new ArrayList<>(args.length - argIndex);
+			otherArgList.addAll(Arrays.asList(args).subList(argIndex, args.length));
+			parsedArgs.put("args", otherArgList);
+		}
+		return parsedArgs;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
index 6432a67..64f2ef1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.nio.file.Files;
 
 /**
  * The Py4j Gateway Server provides RPC service for user's python process.
@@ -56,15 +57,17 @@ public class PythonGatewayServer {
 		// Tells python side the port of our java rpc server
 		String handshakeFilePath = System.getenv("_PYFLINK_CONN_INFO_PATH");
 		File handshakeFile = new File(handshakeFilePath);
-		if (handshakeFile.createNewFile()) {
-			FileOutputStream fileOutputStream = new FileOutputStream(handshakeFile);
-			DataOutputStream stream = new DataOutputStream(fileOutputStream);
-			stream.writeInt(boundPort);
-			stream.close();
-			fileOutputStream.close();
-		} else {
-			System.out.println("Can't create handshake file: " + handshakeFilePath + ", now exit...");
-			return;
+		File tmpPath = Files.createTempFile(handshakeFile.getParentFile().toPath(),
+			"connection", ".info").toFile();
+		FileOutputStream fileOutputStream = new FileOutputStream(tmpPath);
+		DataOutputStream stream = new DataOutputStream(fileOutputStream);
+		stream.writeInt(boundPort);
+		stream.close();
+		fileOutputStream.close();
+
+		if (!tmpPath.renameTo(handshakeFile)) {
+			System.out.println("Unable to write connection information to handshake file: " + handshakeFilePath + ", now exit...");
+			System.exit(1);
 		}
 
 		// Exit on EOF or broken pipe.  This ensures that the server dies
diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java
new file mode 100644
index 0000000..b9012a3
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The util class help to prepare Python env and run the python process.
+ */
+public final class PythonUtil {
+	private static final Logger LOG = LoggerFactory.getLogger(PythonUtil.class);
+
+	private static final String FLINK_OPT_DIR = System.getenv("FLINK_OPT_DIR");
+
+	private static final String FLINK_OPT_DIR_PYTHON = FLINK_OPT_DIR + File.separator + "python";
+
+	/**
+	 * Wraps Python exec environment.
+	 */
+	public static class PythonEnvironment {
+		public String workingDirectory;
+
+		public String pythonExec = "python";
+
+		public String pythonPath;
+
+		Map<String, String> systemEnv = new HashMap<>();
+	}
+
+	/**
+	 * The hook thread that delete the tmp working dir of python process after the python process shutdown.
+	 */
+	private static class ShutDownPythonHook extends Thread {
+		private Process p;
+		private String pyFileDir;
+
+		public ShutDownPythonHook(Process p, String pyFileDir) {
+			this.p = p;
+			this.pyFileDir = pyFileDir;
+		}
+
+		public void run() {
+
+			p.destroyForcibly();
+
+			if (pyFileDir != null) {
+				File pyDir = new File(pyFileDir);
+				FileUtils.deleteDirectoryQuietly(pyDir);
+			}
+		}
+	}
+
+
+	/**
+	 * Prepares PythonEnvironment to start python process.
+	 *
+	 * @param filePathMap map file name to its file path.
+	 * @return PythonEnvironment the Python environment which will be executed in Python process.
+	 */
+	public static PythonEnvironment preparePythonEnvironment(Map<String, Path> filePathMap) {
+		PythonEnvironment env = new PythonEnvironment();
+
+		// 1. setup temporary local directory for the user files
+		String tmpDir = System.getProperty("java.io.tmpdir") +
+			File.separator + "pyflink" + UUID.randomUUID();
+
+		Path tmpDirPath = new Path(tmpDir);
+		try {
+			FileSystem fs = tmpDirPath.getFileSystem();
+			if (fs.exists(tmpDirPath)) {
+				fs.delete(tmpDirPath, true);
+			}
+			fs.mkdirs(tmpDirPath);
+		} catch (IOException e) {
+			LOG.error("Prepare tmp directory failed.", e);
+		}
+
+		env.workingDirectory = tmpDirPath.toString();
+
+		StringBuilder pythonPathEnv = new StringBuilder();
+
+		pythonPathEnv.append(env.workingDirectory);
+
+		// 2. create symbolLink in the working directory for the pyflink dependency libs.
+		List<java.nio.file.Path> pythonLibs = getLibFiles(FLINK_OPT_DIR_PYTHON);
+		for (java.nio.file.Path libPath : pythonLibs) {
+			java.nio.file.Path symbolicLinkFilePath = FileSystems.getDefault().getPath(env.workingDirectory,
+				libPath.getFileName().toString());
+			createSymbolicLinkForPyflinkLib(libPath, symbolicLinkFilePath);
+			pythonPathEnv.append(File.pathSeparator);
+			pythonPathEnv.append(symbolicLinkFilePath.toString());
+		}
+
+		// 3. copy relevant python files to tmp dir and set them in PYTHONPATH.
+		filePathMap.forEach((sourceFileName, sourcePath) -> {
+			Path targetPath = new Path(tmpDirPath, sourceFileName);
+			try {
+				FileUtils.copy(sourcePath, targetPath, true);
+			} catch (IOException e) {
+				LOG.error("Copy files to tmp dir failed", e);
+			}
+			String targetFileName = targetPath.toString();
+			pythonPathEnv.append(File.pathSeparator);
+			pythonPathEnv.append(targetFileName);
+
+		});
+
+		env.pythonPath = pythonPathEnv.toString();
+		return env;
+	}
+
+	/**
+	 * Gets pyflink dependent libs in specified directory.
+	 *
+	 * @param libDir The lib directory
+	 */
+	public static List<java.nio.file.Path> getLibFiles(String libDir) {
+		final List<java.nio.file.Path> libFiles = new ArrayList<>();
+		SimpleFileVisitor<java.nio.file.Path> finder = new SimpleFileVisitor<java.nio.file.Path>() {
+			@Override
+			public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
+				// exclude .txt file
+				if (!file.toString().endsWith(".txt")) {
+					libFiles.add(file);
+				}
+				return FileVisitResult.CONTINUE;
+			}
+		};
+		try {
+			Files.walkFileTree(FileSystems.getDefault().getPath(libDir), finder);
+		} catch (IOException e) {
+			LOG.error("Gets pyflink dependent libs failed.", e);
+		}
+		return libFiles;
+	}
+
+	/**
+	 * Creates symbolLink in working directory for pyflink lib.
+	 *
+	 * @param libPath          the pyflink lib file path.
+	 * @param symbolicLinkPath the symbolic link to pyflink lib.
+	 */
+	public static void createSymbolicLinkForPyflinkLib(java.nio.file.Path libPath, java.nio.file.Path symbolicLinkPath) {
+		try {
+			Files.createSymbolicLink(symbolicLinkPath, libPath);
+		} catch (IOException e) {
+			LOG.error("Create symbol link for pyflink lib failed.", e);
+			LOG.info("Try to copy pyflink lib to working directory");
+			try {
+				Files.copy(libPath, symbolicLinkPath);
+			} catch (IOException ex) {
+				LOG.error("Copy pylink lib to working directory failed", ex);
+			}
+		}
+	}
+
+	/**
+	 * Starts python process.
+	 *
+	 * @param pythonEnv the python Environment which will be in a process.
+	 * @param commands  the commands that python process will execute.
+	 * @return the process represent the python process.
+	 * @throws IOException Thrown if an error occurred when python process start.
+	 */
+	public static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> commands) throws IOException {
+		ProcessBuilder pythonProcessBuilder = new ProcessBuilder();
+		Map<String, String> env = pythonProcessBuilder.environment();
+		env.put("PYTHONPATH", pythonEnv.pythonPath);
+		pythonEnv.systemEnv.forEach(env::put);
+		commands.add(0, pythonEnv.pythonExec);
+		pythonProcessBuilder.command(commands);
+		// set the working directory.
+		pythonProcessBuilder.directory(new File(pythonEnv.workingDirectory));
+		// redirect the stderr to stdout
+		pythonProcessBuilder.redirectErrorStream(true);
+		// set the child process the output same as the parent process.
+		pythonProcessBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
+		Process process = pythonProcessBuilder.start();
+		if (!process.isAlive()) {
+			throw new RuntimeException("Failed to start Python process. ");
+		}
+
+		// Make sure that the python sub process will be killed when JVM exit
+		ShutDownPythonHook hook = new ShutDownPythonHook(process, pythonEnv.workingDirectory);
+		Runtime.getRuntime().addShutdownHook(hook);
+
+		return process;
+	}
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
new file mode 100644
index 0000000..0b6f570
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Test;
+import py4j.GatewayServer;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link PythonDriver}.
+ */
+public class PythonDriverTest {
+	@Test
+	public void testStartGatewayServer() {
+		GatewayServer gatewayServer = PythonDriver.startGatewayServer();
+		try {
+			Socket socket = new Socket("localhost", gatewayServer.getListeningPort());
+			assert socket.isConnected();
+		} catch (IOException e) {
+			throw new RuntimeException("Connect Gateway Server failed");
+		} finally {
+			gatewayServer.shutdown();
+		}
+	}
+
+	@Test
+	public void testConstructCommands() {
+		Map<String, Path> filePathMap = new HashMap<>();
+		Map<String, List<String>> parseArgs = new HashMap<>();
+		parseArgs.put("py", Collections.singletonList("xxx.py"));
+		List<String> pyFilesList = new ArrayList<>();
+		pyFilesList.add("a.py");
+		pyFilesList.add("b.py");
+		pyFilesList.add("c.py");
+		parseArgs.put("pyfs", pyFilesList);
+		List<String> otherArgs = new ArrayList<>();
+		otherArgs.add("--input");
+		otherArgs.add("in.txt");
+		parseArgs.put("args", otherArgs);
+		List<String> commands = PythonDriver.constructPythonCommands(filePathMap, parseArgs);
+		Path pythonPath = filePathMap.get("xxx.py");
+		Assert.assertNotNull(pythonPath);
+		Assert.assertEquals(pythonPath.getName(), "xxx.py");
+		Path aPyFilePath = filePathMap.get("a.py");
+		Assert.assertNotNull(aPyFilePath);
+		Assert.assertEquals(aPyFilePath.getName(), "a.py");
+		Path bPyFilePath = filePathMap.get("b.py");
+		Assert.assertNotNull(bPyFilePath);
+		Assert.assertEquals(bPyFilePath.getName(), "b.py");
+		Path cPyFilePath = filePathMap.get("c.py");
+		Assert.assertNotNull(cPyFilePath);
+		Assert.assertEquals(cPyFilePath.getName(), "c.py");
+		Assert.assertEquals(3, commands.size());
+		Assert.assertEquals(commands.get(0), "xxx.py");
+		Assert.assertEquals(commands.get(1), "--input");
+		Assert.assertEquals(commands.get(2), "in.txt");
+	}
+
+	@Test
+	public void testParseOptions() {
+		String[] args = {"py", "xxx.py", "pyfs", "a.py,b.py,c.py", "--input", "in.txt"};
+		Map<String, List<String>> parsedArgs = PythonDriver.parseOptions(args);
+		List<String> pythonMainFile = parsedArgs.get("py");
+		Assert.assertNotNull(pythonMainFile);
+		Assert.assertEquals(1, pythonMainFile.size());
+		Assert.assertEquals(pythonMainFile.get(0), args[1]);
+		List<String> pyFilesList = parsedArgs.get("pyfs");
+		Assert.assertEquals(3, pyFilesList.size());
+		String[] pyFiles = args[3].split(",");
+		for (int i = 0; i < pyFiles.length; i++) {
+			assert pyFilesList.get(i).equals(pyFiles[i]);
+		}
+		List<String> otherArgs = parsedArgs.get("args");
+		for (int i = 4; i < args.length; i++) {
+			Assert.assertEquals(otherArgs.get(i - 4), args[i]);
+		}
+	}
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java b/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java
new file mode 100644
index 0000000..4b14ced
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests for the {@link PythonUtil}.
+ */
+public class PythonUtilTest {
+	private Path sourceTmpDirPath;
+	private Path targetTmpDirPath;
+	private FileSystem sourceFs;
+	private FileSystem targetFs;
+
+	@Before
+	public void prepareTestEnvironment() {
+		String sourceTmpDir = System.getProperty("java.io.tmpdir") +
+			File.separator + "source_" + UUID.randomUUID();
+		String targetTmpDir = System.getProperty("java.io.tmpdir") +
+			File.separator + "target_" + UUID.randomUUID();
+
+		sourceTmpDirPath = new Path(sourceTmpDir);
+		targetTmpDirPath = new Path(targetTmpDir);
+		try {
+			sourceFs = sourceTmpDirPath.getFileSystem();
+			if (sourceFs.exists(sourceTmpDirPath)) {
+				sourceFs.delete(sourceTmpDirPath, true);
+			}
+			sourceFs.mkdirs(sourceTmpDirPath);
+			targetFs = targetTmpDirPath.getFileSystem();
+			if (targetFs.exists(targetTmpDirPath)) {
+				targetFs.delete(targetTmpDirPath, true);
+			}
+			targetFs.mkdirs(targetTmpDirPath);
+		} catch (IOException e) {
+			throw new RuntimeException("initial PythonUtil test environment failed");
+		}
+	}
+
+	@Test
+	public void testStartPythonProcess() {
+		PythonUtil.PythonEnvironment pythonEnv = new PythonUtil.PythonEnvironment();
+		pythonEnv.workingDirectory = targetTmpDirPath.toString();
+		pythonEnv.pythonPath = targetTmpDirPath.toString();
+		List<String> commands = new ArrayList<>();
+		Path pyPath = new Path(targetTmpDirPath, "word_count.py");
+		try {
+			targetFs.create(pyPath, FileSystem.WriteMode.OVERWRITE);
+			File pyFile = new File(pyPath.toString());
+			String pyProgram = "#!/usr/bin/python\n" +
+				"# -*- coding: UTF-8 -*-\n" +
+				"import sys\n" +
+				"\n" +
+				"if __name__=='__main__':\n" +
+				"\tfilename = sys.argv[1]\n" +
+				"\tfo = open(filename, \"w\")\n" +
+				"\tfo.write( \"hello world\")\n" +
+				"\tfo.close()";
+			Files.write(pyFile.toPath(), pyProgram.getBytes(), StandardOpenOption.WRITE);
+			Path result = new Path(targetTmpDirPath, "word_count_result.txt");
+			commands.add(pyFile.getName());
+			commands.add(result.getName());
+			Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands);
+			int exitCode = pythonProcess.waitFor();
+			if (exitCode != 0) {
+				throw new RuntimeException("Python process exits with code: " + exitCode);
+			}
+			String cmdResult = new String(Files.readAllBytes(new File(result.toString()).toPath()));
+			Assert.assertEquals(cmdResult, "hello world");
+			pythonProcess.destroyForcibly();
+			targetFs.delete(pyPath, true);
+			targetFs.delete(result, true);
+		} catch (IOException | InterruptedException e) {
+			throw new RuntimeException("test start Python process failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void cleanEnvironment() {
+		try {
+			sourceFs.delete(sourceTmpDirPath, true);
+			targetFs.delete(targetTmpDirPath, true);
+		} catch (IOException e) {
+			throw new RuntimeException("delete tmp dir failed " + e.getMessage());
+		}
+	}
+}
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 52a6466..1350f10 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -572,6 +572,9 @@ under the License.
 								<relocation>
 									<pattern>py4j</pattern>
 									<shadedPattern>org.apache.flink.api.python.py4j</shadedPattern>
+									<includes>
+										<include>py4j.*</include>
+									</includes>
 								</relocation>
 							</relocations>
 						</configuration>
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 3eb5698..788ec1b 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -242,6 +242,13 @@ under the License.
 			<fileMode>0755</fileMode>
 		</fileSet>
 
+		<!-- copy python table example to examples of dist -->
+		<fileSet>
+			<directory>../flink-python/pyflink/table/examples</directory>
+			<outputDirectory>examples/python/table</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+
 	</fileSets>
 
 </assembly>
diff --git a/flink-python/pyflink/find_flink_home.py b/flink-python/pyflink/find_flink_home.py
index 0491368..9806490 100644
--- a/flink-python/pyflink/find_flink_home.py
+++ b/flink-python/pyflink/find_flink_home.py
@@ -28,6 +28,9 @@ def _find_flink_home():
     # If the environment has set FLINK_HOME, trust it.
     if 'FLINK_HOME' in os.environ:
         return os.environ['FLINK_HOME']
+    elif 'FLINK_ROOT_DIR' in os.environ:
+        os.environ['FLINK_HOME'] = os.environ['FLINK_ROOT_DIR']
+        return os.environ['FLINK_ROOT_DIR']
     else:
         try:
             flink_root_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index e5c8330..b218d23 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -28,7 +28,6 @@ from threading import RLock
 from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
 from pyflink.find_flink_home import _find_flink_home
 
-
 _gateway = None
 _lock = RLock()
 
@@ -46,6 +45,9 @@ def get_gateway():
                 _gateway = JavaGateway(gateway_parameters=gateway_param)
             else:
                 _gateway = launch_gateway()
+
+            # import the flink view
+            import_flink_view(_gateway)
     return _gateway
 
 
@@ -97,6 +99,14 @@ def launch_gateway():
     gateway = JavaGateway(
         gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True))
 
+    return gateway
+
+
+def import_flink_view(gateway):
+    """
+    import the classes used by PyFlink.
+    :param gateway:gateway connected to JavaGateWayServer
+    """
     # Import the classes used by PyFlink
     java_import(gateway.jvm, "org.apache.flink.table.api.*")
     java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
@@ -109,5 +119,3 @@ def launch_gateway():
     java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment")
     java_import(gateway.jvm,
                 "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
-
-    return gateway
diff --git a/flink-python/pyflink/table/examples/batch/__init__.py b/flink-python/pyflink/table/examples/batch/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-python/pyflink/table/examples/batch/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
new file mode 100644
index 0000000..a324af4
--- /dev/null
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -0,0 +1,79 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import tempfile
+
+from pyflink.table import TableEnvironment, TableConfig
+from pyflink.table.table_sink import CsvTableSink
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+
+
+# TODO: the word_count.py is just a test example for CLI.
+#  After pyflink have aligned Java Table API Connectors, this example will be improved.
+def word_count():
+    tmp_dir = tempfile.gettempdir()
+    source_path = tmp_dir + '/streaming.csv'
+    if os.path.isfile(source_path):
+        os.remove(source_path)
+    content = "line Licensed to the Apache Software Foundation ASF under one " \
+              "line or more contributor license agreements See the NOTICE file " \
+              "line distributed with this work for additional information " \
+              "line regarding copyright ownership The ASF licenses this file " \
+              "to you under the Apache License Version the " \
+              "License you may not use this file except in compliance " \
+              "with the License"
+
+    with open(source_path, 'w') as f:
+        for word in content.split(" "):
+            f.write(",".join([word, "1"]))
+            f.write("\n")
+            f.flush()
+        f.close()
+
+    t_config = TableConfig.Builder().as_batch_execution().set_parallelism(1).build()
+    t_env = TableEnvironment.create(t_config)
+
+    field_names = ["word", "cout"]
+    field_types = [DataTypes.STRING, DataTypes.LONG]
+
+    # register Orders table in table environment
+    t_env.register_table_source(
+        "Word",
+        CsvTableSource(source_path, field_names, field_types))
+
+    # register Results table in table environment
+    tmp_dir = tempfile.gettempdir()
+    tmp_csv = tmp_dir + '/streaming2.csv'
+    if os.path.isfile(tmp_csv):
+        os.remove(tmp_csv)
+
+    t_env.register_table_sink(
+        "Results",
+        field_names, field_types, CsvTableSink(tmp_csv))
+
+    t_env.scan("Word") \
+        .group_by("word") \
+        .select("word, count(1) as count") \
+        .insert_into("Results")
+
+    t_env.execute()
+
+
+if __name__ == '__main__':
+    word_count()