You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/20 14:43:51 UTC

[3/3] flink git commit: [FLINK-8114][py] Fix forwarding of arguments

[FLINK-8114][py] Fix forwarding of arguments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a4a677
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a4a677
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a4a677

Branch: refs/heads/master
Commit: b0a4a67705c187920ac9151a2a0c6abe25b9488e
Parents: 316fa1f
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 15:07:32 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:39:04 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      |  3 +-
 .../flink/python/api/PythonPlanBinderTest.java  | 36 ++++++++++++++++----
 .../flink/python/api/args/multiple_args.py      | 32 +++++++++++++++++
 .../org/apache/flink/python/api/args/no_arg.py  | 32 +++++++++++++++++
 4 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3fec947..864ea30 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -117,7 +117,8 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 
 		String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
-		process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")});
+		String arguments = config.getString(PLAN_ARGUMENTS_KEY, "");
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments);
 		outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
 		outPrinter.start();
 		errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 55cf1dc..92a985c 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -37,18 +37,19 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		return true;
 	}
 
-	private static String findUtilsFile() throws Exception {
+	private static Path getBaseTestPythonDir() {
 		FileSystem fs = FileSystem.getLocalFileSystem();
-		return fs.getWorkingDirectory().toString()
-				+ "/src/test/python/org/apache/flink/python/api/utils/utils.py";
+		return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api");
+	}
+
+	private static String findUtilsFile() throws Exception {
+		return new Path(getBaseTestPythonDir(), "utils/utils.py").toString();
 	}
 
 	private static List<String> findTestFiles() throws Exception {
 		List<String> files = new ArrayList<>();
 		FileSystem fs = FileSystem.getLocalFileSystem();
-		FileStatus[] status = fs.listStatus(
-				new Path(fs.getWorkingDirectory().toString()
-						+ "/src/test/python/org/apache/flink/python/api"));
+		FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
 		for (FileStatus f : status) {
 			String file = f.getPath().toString();
 			if (file.endsWith(".py")) {
@@ -126,11 +127,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		if (python2 != null) {
 			log.info("Running python2 tests");
 			runTestPrograms(python2);
+			runArgvTestPrograms(python2);
 		}
 		String python3 = getPython3Path();
 		if (python3 != null) {
 			log.info("Running python3 tests");
 			runTestPrograms(python3);
+			runArgvTestPrograms(python3);
 		}
 	}
 
@@ -177,4 +180,25 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 			// we expect this exception to be thrown since no argument was passed
 		}
 	}
+
+	private void runArgvTestPrograms(String pythonBinary) throws Exception {
+		log.info("Running runArgvTestPrograms.");
+		String utils = findUtilsFile();
+
+		{
+			String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString();
+
+			Configuration configuration = new Configuration();
+			configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+			new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils});
+		}
+
+		{
+			String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString();
+
+			Configuration configuration = new Configuration();
+			configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+			new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"});
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
new file mode 100644
index 0000000..57b44c3
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(len(sys.argv))
+
+    d1.map_partition(Verify([3], "MultipleArguments")).output()
+
+    #Execution
+    env.set_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
new file mode 100644
index 0000000..6afe7f2
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(len(sys.argv))
+    
+    d1.map_partition(Verify([1], "NoArgument")).output()
+
+    #Execution
+    env.set_parallelism(1)
+
+    env.execute(local=True)