You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/04/01 08:16:47 UTC
[zeppelin] branch master updated: [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 148129e [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
148129e is described below
commit 148129e4029009f1a6131144917262d2ec71d730
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 17 10:34:19 2022 +0800
[ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
### What is this PR for?
Trivial PR to check the existence of the folder, and throw a meaningful error when it doesn't exist.
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5669
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4320 from zjffdu/ZEPPELIN-5669 and squashes the following commits:
769d0aa984 [Jeff Zhang] [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
---
.../main/java/org/apache/zeppelin/flink/Flink112Shims.java | 12 ++++++++----
.../main/java/org/apache/zeppelin/flink/Flink113Shims.java | 12 ++++++++----
.../main/java/org/apache/zeppelin/flink/Flink114Shims.java | 12 ++++++++----
3 files changed, 24 insertions(+), 12 deletions(-)
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index e182f97..757e7a4 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -125,20 +125,24 @@ public class Flink112Shims extends FlinkShims {
if ("yarn-application".equalsIgnoreCase(mode)) {
// for yarn application mode, FLINK_HOME is container working directory
String flinkHome = new File(".").getAbsolutePath();
- return getPyFlinkPythonPath(flinkHome + "/lib/python");
+ return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
}
String flinkHome = System.getenv("FLINK_HOME");
if (StringUtils.isNotBlank(flinkHome)) {
- return getPyFlinkPythonPath(flinkHome + "/opt/python");
+ return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
} else {
throw new IOException("No FLINK_HOME is specified");
}
}
- private String getPyFlinkPythonPath(String pyFlinkFolder) {
+ private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
- List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+ if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
+ throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
+ pyFlinkFolder.getAbsolutePath()));
+ }
+ List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
StringBuilder builder = new StringBuilder();
for (File file : depFiles) {
LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index a4743e8..792174a 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -128,20 +128,24 @@ public class Flink113Shims extends FlinkShims {
if ("yarn-application".equalsIgnoreCase(mode)) {
// for yarn application mode, FLINK_HOME is container working directory
String flinkHome = new File(".").getAbsolutePath();
- return getPyFlinkPythonPath(flinkHome + "/lib/python");
+ return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
}
String flinkHome = System.getenv("FLINK_HOME");
if (StringUtils.isNotBlank(flinkHome)) {
- return getPyFlinkPythonPath(flinkHome + "/opt/python");
+ return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
} else {
throw new IOException("No FLINK_HOME is specified");
}
}
- private String getPyFlinkPythonPath(String pyFlinkFolder) {
+ private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
- List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+ if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
+ throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
+ pyFlinkFolder.getAbsolutePath()));
+ }
+ List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
StringBuilder builder = new StringBuilder();
for (File file : depFiles) {
LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index e23ac54..2e4e4b4 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -125,20 +125,24 @@ public class Flink114Shims extends FlinkShims {
if ("yarn-application".equalsIgnoreCase(mode)) {
// for yarn application mode, FLINK_HOME is container working directory
String flinkHome = new File(".").getAbsolutePath();
- return getPyFlinkPythonPath(flinkHome + "/lib/python");
+ return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
}
String flinkHome = System.getenv("FLINK_HOME");
if (StringUtils.isNotBlank(flinkHome)) {
- return getPyFlinkPythonPath(flinkHome + "/opt/python");
+ return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
} else {
throw new IOException("No FLINK_HOME is specified");
}
}
- private String getPyFlinkPythonPath(String pyFlinkFolder) {
+ private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
- List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+ if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
+ throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
+ pyFlinkFolder.getAbsolutePath()));
+ }
+ List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
StringBuilder builder = new StringBuilder();
for (File file : depFiles) {
LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());