You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/07/16 02:46:15 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn session cluster

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new f485742  [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn  session cluster
f485742 is described below

commit f485742405b0382291ca9132422e70362b84dc7a
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jul 14 17:55:28 2020 +0800

    [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn  session cluster
    
    ### What is this PR for?
    
    This PR would check whether the specified jars exists or not, otherwise the invalid user jar would lead the issue of unable to shutdown flink yarn session cluster in yarn interpreter mode.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4955
    
    ### How should this be tested?
    * CI pass
    
    https://travis-ci.org/github/zjffdu/zeppelin/builds/707928916
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? NO
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3855 from zjffdu/ZEPPELIN-4955 and squashes the following commits:
    
    80bdd917c [Jeff Zhang] [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn session cluster
    
    (cherry picked from commit 132c25ed0d7e868f9120b193ada03af286cb8333)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../main/java/org/apache/zeppelin/flink/HadoopUtils.java  |  3 +++
 .../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | 15 +++++++++++++--
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
index e8cefba..85d33f0 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
@@ -72,6 +72,9 @@ public class HadoopUtils {
     File tmpDir = Files.createTempDir();
     FileSystem fs = FileSystem.get(new Configuration());
     Path sourcePath = fs.makeQualified(new Path(jarOnHdfs));
+    if (!fs.exists(sourcePath)) {
+      throw new IOException("jar file: " + jarOnHdfs + " doesn't exist.");
+    }
     Path destPath = new Path(tmpDir.getAbsolutePath() + "/" + sourcePath.getName());
     fs.copyToLocalFile(sourcePath, destPath);
     return new File(destPath.toString()).getAbsolutePath();
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index d18a559..2f397c1 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.zeppelin.flink
 
-import java.io.{BufferedReader, File}
+import java.io.{BufferedReader, File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.nio.file.Files
 import java.util.Properties
@@ -786,7 +786,18 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   private def getOrDownloadJars(jars: Seq[String]): Seq[String] = {
-    jars.map(jar => if (jar.contains("://")) HadoopUtils.downloadJar(jar) else jar)
+    jars.map(jar => {
+      if (jar.contains("://")) {
+        HadoopUtils.downloadJar(jar)
+      } else {
+        val jarFile = new File(jar)
+        if (!jarFile.exists() || !jarFile.isFile) {
+          throw new Exception(s"jar file: ${jar} doesn't exist")
+        } else {
+          jar
+        }
+      }
+    })
   }
 
   def getJobManager = this.jobManager