You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/04/01 08:16:47 UTC

[zeppelin] branch master updated: [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 148129e  [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
148129e is described below

commit 148129e4029009f1a6131144917262d2ec71d730
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 17 10:34:19 2022 +0800

    [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
    
    ### What is this PR for?
    
    Trivial PR to check the existence of the folder, and throw a meaningful error when it doesn't exist.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5669
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4320 from zjffdu/ZEPPELIN-5669 and squashes the following commits:
    
    769d0aa984 [Jeff Zhang] [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
---
 .../main/java/org/apache/zeppelin/flink/Flink112Shims.java   | 12 ++++++++----
 .../main/java/org/apache/zeppelin/flink/Flink113Shims.java   | 12 ++++++++----
 .../main/java/org/apache/zeppelin/flink/Flink114Shims.java   | 12 ++++++++----
 3 files changed, 24 insertions(+), 12 deletions(-)

diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index e182f97..757e7a4 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -125,20 +125,24 @@ public class Flink112Shims extends FlinkShims {
     if ("yarn-application".equalsIgnoreCase(mode)) {
       // for yarn application mode, FLINK_HOME is container working directory
       String flinkHome = new File(".").getAbsolutePath();
-      return getPyFlinkPythonPath(flinkHome + "/lib/python");
+      return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
     }
 
     String flinkHome = System.getenv("FLINK_HOME");
     if (StringUtils.isNotBlank(flinkHome)) {
-      return getPyFlinkPythonPath(flinkHome + "/opt/python");
+      return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
     } else {
       throw new IOException("No FLINK_HOME is specified");
     }
   }
 
-  private String getPyFlinkPythonPath(String pyFlinkFolder) {
+  private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
     LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
-    List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+    if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
+      throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
+              pyFlinkFolder.getAbsolutePath()));
+    }
+    List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
     StringBuilder builder = new StringBuilder();
     for (File file : depFiles) {
       LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index a4743e8..792174a 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -128,20 +128,24 @@ public class Flink113Shims extends FlinkShims {
     if ("yarn-application".equalsIgnoreCase(mode)) {
       // for yarn application mode, FLINK_HOME is container working directory
       String flinkHome = new File(".").getAbsolutePath();
-      return getPyFlinkPythonPath(flinkHome + "/lib/python");
+      return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
     }
 
     String flinkHome = System.getenv("FLINK_HOME");
     if (StringUtils.isNotBlank(flinkHome)) {
-      return getPyFlinkPythonPath(flinkHome + "/opt/python");
+      return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
     } else {
       throw new IOException("No FLINK_HOME is specified");
     }
   }
 
-  private String getPyFlinkPythonPath(String pyFlinkFolder) {
+  private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
     LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
-    List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+    if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
+      throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
+              pyFlinkFolder.getAbsolutePath()));
+    }
+    List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
     StringBuilder builder = new StringBuilder();
     for (File file : depFiles) {
       LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index e23ac54..2e4e4b4 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -125,20 +125,24 @@ public class Flink114Shims extends FlinkShims {
     if ("yarn-application".equalsIgnoreCase(mode)) {
       // for yarn application mode, FLINK_HOME is container working directory
       String flinkHome = new File(".").getAbsolutePath();
-      return getPyFlinkPythonPath(flinkHome + "/lib/python");
+      return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
     }
 
     String flinkHome = System.getenv("FLINK_HOME");
     if (StringUtils.isNotBlank(flinkHome)) {
-      return getPyFlinkPythonPath(flinkHome + "/opt/python");
+      return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
     } else {
       throw new IOException("No FLINK_HOME is specified");
     }
   }
 
-  private String getPyFlinkPythonPath(String pyFlinkFolder) {
+  private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
     LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
-    List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+    if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
+      throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
+              pyFlinkFolder.getAbsolutePath()));
+    }
+    List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
     StringBuilder builder = new StringBuilder();
     for (File file : depFiles) {
       LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());