You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/02 08:50:51 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 3dff78c  [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged
3dff78c is described below

commit 3dff78c87ffbfe6575afb4642b54e2cc14cc4aa1
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon May 17 22:03:21 2021 +0800

    [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged
    
    ### What is this PR for?
    
    Simple PR to check whether the current latest checkpoint is changed, only update paragraph config when it is changed.
    
    ### What type of PR is it?
    [Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5374
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4120 from zjffdu/ZEPPELIN-5374 and squashes the following commits:
    
    79f269f24e [Jeff Zhang] [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged
    
    (cherry picked from commit b173590a62144bfa20d4822b99283677edd61c16)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../src/main/java/org/apache/zeppelin/flink/JobManager.java           | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 5f40569..7242b04 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -188,6 +188,7 @@ public class JobManager {
     private AtomicBoolean running = new AtomicBoolean(true);
     private boolean isFirstPoll = true;
     private long checkInterval;
+    private String latestCheckpointPath;
 
     FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context, long checkInterval) {
       this.flinkWebUrl = flinkWebUrl;
@@ -253,11 +254,12 @@ public class JobManager {
               if (completedObject.has("external_path")) {
                 String checkpointPath = completedObject.getString("external_path");
                 LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
-                if (!StringUtils.isBlank(checkpointPath)) {
+                if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)) {
                   Map<String, String> config = new HashMap<>();
                   config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
                   context.getIntpEventClient().updateParagraphConfig(
                           context.getNoteId(), context.getParagraphId(), config);
+                  latestCheckpointPath = checkpointPath;
                 }
               }
             }