You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/20 14:43:49 UTC
[1/3] flink git commit: [FLINK-8109][py] Check for existence of
plan/additional files
Repository: flink
Updated Branches:
refs/heads/master e5ab4053b -> b0a4a6770
[FLINK-8109][py] Check for existence of plan/additional files
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/316fa1fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/316fa1fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/316fa1fd
Branch: refs/heads/master
Commit: 316fa1fd59b80f453db5900437ae553ae24a5966
Parents: 8ec3fce
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 12:58:27 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 14:37:52 2017 +0100
----------------------------------------------------------------------
.../flink/python/api/PythonPlanBinder.java | 21 +++++++-
.../flink/python/api/PythonPlanBinderTest.java | 53 ++++++++++++++++----
2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/316fa1fd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index b7adde1..e0c8215 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
@@ -93,7 +94,12 @@ public class PythonPlanBinder {
public static void main(String[] args) throws Exception {
Configuration globalConfig = GlobalConfiguration.loadConfiguration();
PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
- binder.runPlan(args);
+ try {
+ binder.runPlan(args);
+ } catch (Exception e) {
+ System.out.println("Failed to run plan: " + e.getMessage());
+ LOG.error("Failed to run plan.", e);
+ }
}
public PythonPlanBinder(Configuration globalConfig) {
@@ -146,11 +152,22 @@ public class PythonPlanBinder {
operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments);
+ Path planPath = new Path(planFile);
+ if (!FileSystem.getUnguardedFileSystem(planPath.toUri()).exists(planPath)) {
+ throw new FileNotFoundException("Plan file " + planFile + " does not exist.");
+ }
+ for (String file : filesToCopy) {
+ Path filePath = new Path(file);
+ if (!FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath)) {
+ throw new FileNotFoundException("Additional file " + file + " does not exist.");
+ }
+ }
+
// copy flink library, plan file and additional files to temporary location
Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
deleteIfExists(tmpPlanFilesPath);
FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
- copyFile(new Path(planFile), tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
+ copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
for (String file : filesToCopy) {
Path source = new Path(file);
copyFile(source, tmpPlanFilesPath, source.getName());
http://git-wip-us.apache.org/repos/asf/flink/blob/316fa1fd/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 9e63091..55cf1dc 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.python.api.streaming.data.PythonStreamer;
import org.apache.flink.test.util.JavaProgramTestBase;
import java.io.BufferedReader;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
@@ -119,22 +120,52 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
testBoundCheck();
- String utils = findUtilsFile();
+ testNotExistingPlanFile();
+ testNotExistingAdditionalFile();
String python2 = getPython2Path();
if (python2 != null) {
- for (String file : findTestFiles()) {
- Configuration configuration = new Configuration();
- configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python2);
- new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
- }
+ log.info("Running python2 tests");
+ runTestPrograms(python2);
}
String python3 = getPython3Path();
if (python3 != null) {
- for (String file : findTestFiles()) {
- Configuration configuration = new Configuration();
- configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python3);
- new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
- }
+ log.info("Running python3 tests");
+ runTestPrograms(python3);
+ }
+ }
+
+ private void runTestPrograms(String pythonBinary) throws Exception {
+ String utils = findUtilsFile();
+ for (String file : findTestFiles()) {
+ log.info("Running file {}.", file);
+ Configuration configuration = new Configuration();
+ configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+ new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
+ }
+ }
+
+ private void testNotExistingPlanFile() throws Exception {
+ log.info("Running testNotExistingPlanFile.");
+ String utils = findUtilsFile();
+ String nonExistingPlan = utils + "abc";
+ Configuration configuration = new Configuration();
+ try {
+ new PythonPlanBinder(configuration).runPlan(new String[]{nonExistingPlan});
+ } catch (FileNotFoundException expected) {
+ // we expect this exception to be thrown since the plan file does not exist
+ }
+ }
+
+ private void testNotExistingAdditionalFile() throws Exception {
+ log.info("Running testNotExistingAdditionalFile.");
+ String utils = findUtilsFile();
+ String planFile = findTestFiles().iterator().next();
+ String nonExistingLibrary = utils + "abc";
+ Configuration configuration = new Configuration();
+ try {
+ new PythonPlanBinder(configuration).runPlan(new String[]{planFile, utils, nonExistingLibrary});
+ } catch (FileNotFoundException expected) {
+ // we expect this exception to be thrown since the plan file does not exist
}
}
[2/3] flink git commit: [FLINK-8108][py] Fix bounds check
Posted by ch...@apache.org.
[FLINK-8108][py] Fix bounds check
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ec3fce7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ec3fce7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ec3fce7
Branch: refs/heads/master
Commit: 8ec3fce7bb48ef8621fa5b0f553c53e641117e0c
Parents: e5ab405
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 12:57:43 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 14:37:52 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/python/api/PythonPlanBinder.java | 9 ++++-----
.../org/apache/flink/python/api/PythonPlanBinderTest.java | 10 ++++++++++
2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ec3fce7/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 810c8cd..b7adde1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -91,11 +91,6 @@ public class PythonPlanBinder {
* @throws Exception
*/
public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
- return;
- }
-
Configuration globalConfig = GlobalConfiguration.loadConfiguration();
PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
binder.runPlan(args);
@@ -126,6 +121,10 @@ public class PythonPlanBinder {
}
void runPlan(String[] args) throws Exception {
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Missing script file argument. Usage: ./bin/pyflink.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]");
+ }
+
int split = 0;
for (int x = 0; x < args.length; x++) {
if (args[x].equals("-")) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ec3fce7/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 2a19a5f..9e63091 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -118,6 +118,7 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
+ testBoundCheck();
String utils = findUtilsFile();
String python2 = getPython2Path();
if (python2 != null) {
@@ -136,4 +137,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
}
}
}
+
+ private void testBoundCheck() throws Exception {
+ log.info("Running testBoundCheck.");
+ try {
+ new PythonPlanBinder(new Configuration()).runPlan(new String[0]);
+ } catch (IllegalArgumentException expected) {
+ // we expect this exception to be thrown since no argument was passed
+ }
+ }
}
[3/3] flink git commit: [FLINK-8114][py] Fix forwarding of arguments
Posted by ch...@apache.org.
[FLINK-8114][py] Fix forwarding of arguments
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a4a677
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a4a677
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a4a677
Branch: refs/heads/master
Commit: b0a4a67705c187920ac9151a2a0c6abe25b9488e
Parents: 316fa1f
Author: zentol <ch...@apache.org>
Authored: Mon Nov 20 15:07:32 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:39:04 2017 +0100
----------------------------------------------------------------------
.../api/streaming/data/PythonStreamer.java | 3 +-
.../flink/python/api/PythonPlanBinderTest.java | 36 ++++++++++++++++----
.../flink/python/api/args/multiple_args.py | 32 +++++++++++++++++
.../org/apache/flink/python/api/args/no_arg.py | 32 +++++++++++++++++
4 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3fec947..864ea30 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -117,7 +117,8 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable
String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
- process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")});
+ String arguments = config.getString(PLAN_ARGUMENTS_KEY, "");
+ process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments);
outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
outPrinter.start();
errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 55cf1dc..92a985c 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -37,18 +37,19 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
return true;
}
- private static String findUtilsFile() throws Exception {
+ private static Path getBaseTestPythonDir() {
FileSystem fs = FileSystem.getLocalFileSystem();
- return fs.getWorkingDirectory().toString()
- + "/src/test/python/org/apache/flink/python/api/utils/utils.py";
+ return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api");
+ }
+
+ private static String findUtilsFile() throws Exception {
+ return new Path(getBaseTestPythonDir(), "utils/utils.py").toString();
}
private static List<String> findTestFiles() throws Exception {
List<String> files = new ArrayList<>();
FileSystem fs = FileSystem.getLocalFileSystem();
- FileStatus[] status = fs.listStatus(
- new Path(fs.getWorkingDirectory().toString()
- + "/src/test/python/org/apache/flink/python/api"));
+ FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
for (FileStatus f : status) {
String file = f.getPath().toString();
if (file.endsWith(".py")) {
@@ -126,11 +127,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
if (python2 != null) {
log.info("Running python2 tests");
runTestPrograms(python2);
+ runArgvTestPrograms(python2);
}
String python3 = getPython3Path();
if (python3 != null) {
log.info("Running python3 tests");
runTestPrograms(python3);
+ runArgvTestPrograms(python3);
}
}
@@ -177,4 +180,25 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
// we expect this exception to be thrown since no argument was passed
}
}
+
+ private void runArgvTestPrograms(String pythonBinary) throws Exception {
+ log.info("Running runArgvTestPrograms.");
+ String utils = findUtilsFile();
+
+ {
+ String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+ new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils});
+ }
+
+ {
+ String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+ new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"});
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
new file mode 100644
index 0000000..57b44c3
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(len(sys.argv))
+
+ d1.map_partition(Verify([3], "MultipleArguments")).output()
+
+ #Execution
+ env.set_parallelism(1)
+
+ env.execute(local=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
new file mode 100644
index 0000000..6afe7f2
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
@@ -0,0 +1,32 @@
+# ###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+ env = get_environment()
+
+ d1 = env.from_elements(len(sys.argv))
+
+ d1.map_partition(Verify([1], "NoArgument")).output()
+
+ #Execution
+ env.set_parallelism(1)
+
+ env.execute(local=True)