You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/20 14:43:51 UTC
[3/3] flink git commit: [FLINK-8114][py] Fix forwarding of arguments
[FLINK-8114][py] Fix forwarding of arguments
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a4a677
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a4a677
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a4a677
Branch: refs/heads/master
Commit: b0a4a67705c187920ac9151a2a0c6abe25b9488e
Parents: 316fa1f
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 15:07:32 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:39:04 2017 +0100
----------------------------------------------------------------------
.../api/streaming/data/PythonStreamer.java | 3 +-
.../flink/python/api/PythonPlanBinderTest.java | 36 ++++++++++++++++----
.../flink/python/api/args/multiple_args.py | 32 +++++++++++++++++
.../org/apache/flink/python/api/args/no_arg.py | 32 +++++++++++++++++
4 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3fec947..864ea30 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -117,7 +117,8 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
- process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")});
+ String arguments = config.getString(PLAN_ARGUMENTS_KEY, "");
+ process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments);
outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
outPrinter.start();
errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 55cf1dc..92a985c 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -37,18 +37,19 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
return true;
}
- private static String findUtilsFile() throws Exception {
+ private static Path getBaseTestPythonDir() {
FileSystem fs = FileSystem.getLocalFileSystem();
- return fs.getWorkingDirectory().toString()
- + "/src/test/python/org/apache/flink/python/api/utils/utils.py";
+ return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api");
+ }
+
+ private static String findUtilsFile() throws Exception {
+ return new Path(getBaseTestPythonDir(), "utils/utils.py").toString();
}
private static List<String> findTestFiles() throws Exception {
List<String> files = new ArrayList<>();
FileSystem fs = FileSystem.getLocalFileSystem();
- FileStatus[] status = fs.listStatus(
- new Path(fs.getWorkingDirectory().toString()
- + "/src/test/python/org/apache/flink/python/api"));
+ FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
for (FileStatus f : status) {
String file = f.getPath().toString();
if (file.endsWith(".py")) {
@@ -126,11 +127,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
if (python2 != null) {
log.info("Running python2 tests");
runTestPrograms(python2);
+ runArgvTestPrograms(python2);
}
String python3 = getPython3Path();
if (python3 != null) {
log.info("Running python3 tests");
runTestPrograms(python3);
+ runArgvTestPrograms(python3);
}
}
@@ -177,4 +180,25 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
// we expect this exception to be thrown since no argument was passed
}
}
+
+ private void runArgvTestPrograms(String pythonBinary) throws Exception {
+ log.info("Running runArgvTestPrograms.");
+ String utils = findUtilsFile();
+
+ {
+ String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+ new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils});
+ }
+
+ {
+ String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+ new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"});
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
new file mode 100644
index 0000000..57b44c3
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(len(sys.argv))
+
+ d1.map_partition(Verify([3], "MultipleArguments")).output()
+
+ #Execution
+ env.set_parallelism(1)
+
+ env.execute(local=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
new file mode 100644
index 0000000..6afe7f2
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(len(sys.argv))
+
+ d1.map_partition(Verify([1], "NoArgument")).output()
+
+ #Execution
+ env.set_parallelism(1)
+
+ env.execute(local=True)