You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/20 14:44:25 UTC

[1/3] flink git commit: [FLINK-8108][py] Fix bounds check

Repository: flink
Updated Branches:
  refs/heads/release-1.4 d0ea9d951 -> ee4420f4b


[FLINK-8108][py] Fix bounds check


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eadda35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eadda35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eadda35

Branch: refs/heads/release-1.4
Commit: 8eadda357c3739af84070cec5a8c227b55b04390
Parents: d0ea9d9
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 12:57:43 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:44:03 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/python/api/PythonPlanBinder.java     |  9 ++++-----
 .../org/apache/flink/python/api/PythonPlanBinderTest.java | 10 ++++++++++
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8eadda35/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 810c8cd..b7adde1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -91,11 +91,6 @@ public class PythonPlanBinder {
 	 * @throws Exception
 	 */
 	public static void main(String[] args) throws Exception {
-		if (args.length < 2) {
-			System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
-			return;
-		}
-
 		Configuration globalConfig = GlobalConfiguration.loadConfiguration();
 		PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
 		binder.runPlan(args);
@@ -126,6 +121,10 @@ public class PythonPlanBinder {
 	}
 
 	void runPlan(String[] args) throws Exception {
+		if (args.length < 1) {
+			throw new IllegalArgumentException("Missing script file argument. Usage: ./bin/pyflink.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
+		}
+
 		int split = 0;
 		for (int x = 0; x < args.length; x++) {
 			if (args[x].equals("-")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8eadda35/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 2a19a5f..9e63091 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -118,6 +118,7 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
+		testBoundCheck();
 		String utils = findUtilsFile();
 		String python2 = getPython2Path();
 		if (python2 != null) {
@@ -136,4 +137,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 			}
 		}
 	}
+
+	private void testBoundCheck() throws Exception {
+		log.info("Running testBoundCheck.");
+		try {
+			new PythonPlanBinder(new Configuration()).runPlan(new String[0]);
+		} catch (IllegalArgumentException expected) {
+			// we expect this exception to be thrown since no argument was passed
+		}
+	}
 }


[3/3] flink git commit: [FLINK-8114][py] Fix forwarding of arguments

Posted by ch...@apache.org.
[FLINK-8114][py] Fix forwarding of arguments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee4420f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee4420f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee4420f4

Branch: refs/heads/release-1.4
Commit: ee4420f4b879ab385a4458c38af762ac0738148c
Parents: 497c36f
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 15:07:32 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:44:12 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      |  3 +-
 .../flink/python/api/PythonPlanBinderTest.java  | 36 ++++++++++++++++----
 .../flink/python/api/args/multiple_args.py      | 32 +++++++++++++++++
 .../org/apache/flink/python/api/args/no_arg.py  | 32 +++++++++++++++++
 4 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee4420f4/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3fec947..864ea30 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -117,7 +117,8 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
 
 		String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
-		process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")});
+		String arguments = config.getString(PLAN_ARGUMENTS_KEY, "");
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments);
 		outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
 		outPrinter.start();
 		errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));

http://git-wip-us.apache.org/repos/asf/flink/blob/ee4420f4/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 55cf1dc..92a985c 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -37,18 +37,19 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		return true;
 	}
 
-	private static String findUtilsFile() throws Exception {
+	private static Path getBaseTestPythonDir() {
 		FileSystem fs = FileSystem.getLocalFileSystem();
-		return fs.getWorkingDirectory().toString()
-				+ "/src/test/python/org/apache/flink/python/api/utils/utils.py";
+		return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api");
+	}
+
+	private static String findUtilsFile() throws Exception {
+		return new Path(getBaseTestPythonDir(), "utils/utils.py").toString();
 	}
 
 	private static List<String> findTestFiles() throws Exception {
 		List<String> files = new ArrayList<>();
 		FileSystem fs = FileSystem.getLocalFileSystem();
-		FileStatus[] status = fs.listStatus(
-				new Path(fs.getWorkingDirectory().toString()
-						+ "/src/test/python/org/apache/flink/python/api"));
+		FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
 		for (FileStatus f : status) {
 			String file = f.getPath().toString();
 			if (file.endsWith(".py")) {
@@ -126,11 +127,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		if (python2 != null) {
 			log.info("Running python2 tests");
 			runTestPrograms(python2);
+			runArgvTestPrograms(python2);
 		}
 		String python3 = getPython3Path();
 		if (python3 != null) {
 			log.info("Running python3 tests");
 			runTestPrograms(python3);
+			runArgvTestPrograms(python3);
 		}
 	}
 
@@ -177,4 +180,25 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 			// we expect this exception to be thrown since no argument was passed
 		}
 	}
+
+	private void runArgvTestPrograms(String pythonBinary) throws Exception {
+		log.info("Running runArgvTestPrograms.");
+		String utils = findUtilsFile();
+
+		{
+			String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString();
+
+			Configuration configuration = new Configuration();
+			configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+			new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils});
+		}
+
+		{
+			String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString();
+
+			Configuration configuration = new Configuration();
+			configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+			new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"});
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee4420f4/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
new file mode 100644
index 0000000..57b44c3
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(len(sys.argv))
+
+    d1.map_partition(Verify([3], "MultipleArguments")).output()
+
+    #Execution
+    env.set_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/ee4420f4/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
new file mode 100644
index 0000000..6afe7f2
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(len(sys.argv))
+    
+    d1.map_partition(Verify([1], "NoArgument")).output()
+
+    #Execution
+    env.set_parallelism(1)
+
+    env.execute(local=True)


[2/3] flink git commit: [FLINK-8109][py] Check for existence of plan/additional files

Posted by ch...@apache.org.
[FLINK-8109][py] Check for existence of plan/additional files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/497c36f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/497c36f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/497c36f0

Branch: refs/heads/release-1.4
Commit: 497c36f0d25c0277832a682226f6f08ba7c83635
Parents: 8eadda3
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 12:58:27 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:44:08 2017 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 21 +++++++-
 .../flink/python/api/PythonPlanBinderTest.java  | 53 ++++++++++++++++----
 2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/497c36f0/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index b7adde1..e0c8215 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
@@ -93,7 +94,12 @@ public class PythonPlanBinder {
 	public static void main(String[] args) throws Exception {
 		Configuration globalConfig = GlobalConfiguration.loadConfiguration();
 		PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
-		binder.runPlan(args);
+		try {
+			binder.runPlan(args);
+		} catch (Exception e) {
+			System.out.println("Failed to run plan: " + e.getMessage());
+			LOG.error("Failed to run plan.", e);
+		}
 	}
 
 	public PythonPlanBinder(Configuration globalConfig) {
@@ -146,11 +152,22 @@ public class PythonPlanBinder {
 
 			operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments);
 
+			Path planPath = new Path(planFile);
+			if (!FileSystem.getUnguardedFileSystem(planPath.toUri()).exists(planPath)) {
+				throw new FileNotFoundException("Plan file " + planFile + " does not exist.");
+			}
+			for (String file : filesToCopy) {
+				Path filePath = new Path(file);
+				if (!FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath)) {
+					throw new FileNotFoundException("Additional file " + file + " does not exist.");
+				}
+			}
+
 			// copy flink library, plan file and additional files to temporary location
 			Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
 			deleteIfExists(tmpPlanFilesPath);
 			FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
-			copyFile(new Path(planFile), tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
+			copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
 			for (String file : filesToCopy) {
 				Path source = new Path(file);
 				copyFile(source, tmpPlanFilesPath, source.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/497c36f0/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 9e63091..55cf1dc 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
@@ -119,22 +120,52 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 		testBoundCheck();
-		String utils = findUtilsFile();
+		testNotExistingPlanFile();
+		testNotExistingAdditionalFile();
 		String python2 = getPython2Path();
 		if (python2 != null) {
-			for (String file : findTestFiles()) {
-				Configuration configuration = new Configuration();
-				configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python2);
-				new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
-			}
+			log.info("Running python2 tests");
+			runTestPrograms(python2);
 		}
 		String python3 = getPython3Path();
 		if (python3 != null) {
-			for (String file : findTestFiles()) {
-				Configuration configuration = new Configuration();
-				configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python3);
-				new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
-			}
+			log.info("Running python3 tests");
+			runTestPrograms(python3);
+		}
+	}
+
+	private void runTestPrograms(String pythonBinary) throws Exception {
+		String utils = findUtilsFile();
+		for (String file : findTestFiles()) {
+			log.info("Running file {}.", file);
+			Configuration configuration = new Configuration();
+			configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+			new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
+		}
+	}
+
+	private void testNotExistingPlanFile() throws Exception {
+		log.info("Running testNotExistingPlanFile.");
+		String utils = findUtilsFile();
+		String nonExistingPlan = utils + "abc";
+		Configuration configuration = new Configuration();
+		try {
+			new PythonPlanBinder(configuration).runPlan(new String[]{nonExistingPlan});
+		} catch (FileNotFoundException expected) {
+			// we expect this exception to be thrown since the plan file does not exist
+		}
+	}
+
+	private void testNotExistingAdditionalFile() throws Exception {
+		log.info("Running testNotExistingAdditionalFile.");
+		String utils = findUtilsFile();
+		String planFile = findTestFiles().iterator().next();
+		String nonExistingLibrary = utils + "abc";
+		Configuration configuration = new Configuration();
+		try {
+			new PythonPlanBinder(configuration).runPlan(new String[]{planFile, utils, nonExistingLibrary});
+		} catch (FileNotFoundException expected) {
+			// we expect this exception to be thrown since the plan file does not exist
 		}
 	}