You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/18 03:02:59 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5412] Unable to cancel flink job due to fail to take savepoint

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new b22b7c8  [ZEPPELIN-5412] Unable to cancel flink job due to fail to take savepoint
b22b7c8 is described below

commit b22b7c8b0a94539210f92d40098225121790537c
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Jun 17 07:26:27 2021 +0800

    [ZEPPELIN-5412] Unable to cancel flink job due to fail to take savepoint
    
    ### What is this PR for?
    
    Sometimes flink job is unable to be cancelled when savepoint is enabled. This PR would fall back to cancel job without savepoint if cancelling with savepoint is failed.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5412
    
    ### How should this be tested?
    * Manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4139 from zjffdu/ZEPPELIN-5412 and squashes the following commits:
    
    da99103249 [Jeff Zhang] logging exception
    ef3b16e956 [Jeff Zhang] [ZEPPELIN-5412] Unable to cancel flink job due to fail to take savepoint
    
    (cherry picked from commit ed18c3e6bfcb64322659c830dbc8ef787bdb3477)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../java/org/apache/zeppelin/flink/JobManager.java   | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 867422d..db8ae57 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -141,13 +141,19 @@ public class JobManager {
       } else {
         LOGGER.info("Trying to stop job of paragraph {} with save point dir: {}",
                 context.getParagraphId(), savePointDir);
-        String savePointPath = jobClient.stopWithSavepoint(true, savePointDir).get();
-        Map<String, String> config = new HashMap<>();
-        config.put(SAVEPOINT_PATH, savePointPath);
-        context.getIntpEventClient().updateParagraphConfig(
-                context.getNoteId(), context.getParagraphId(), config);
-        LOGGER.info("Job {} of paragraph {} is stopped with save point path: {}",
-                jobClient.getJobID(), context.getParagraphId(), savePointPath);
+        try {
+          String savePointPath = jobClient.stopWithSavepoint(true, savePointDir).get();
+          Map<String, String> config = new HashMap<>();
+          config.put(SAVEPOINT_PATH, savePointPath);
+          context.getIntpEventClient().updateParagraphConfig(
+                  context.getNoteId(), context.getParagraphId(), config);
+          LOGGER.info("Job {} of paragraph {} is stopped with save point path: {}",
+                  jobClient.getJobID(), context.getParagraphId(), savePointPath);
+        } catch (Exception e) {
+          LOGGER.warn("Fail to cancel job of paragraph {} with savepoint, try to cancel it without savepoint",
+                  context.getParagraphId(), e);
+          jobClient.cancel();
+        }
       }
       cancelled = true;
     } catch (Exception e) {