You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/02 08:50:51 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5374] Don't update
paragraph config when latest checkpoint of flink is unchanged
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 3dff78c [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged
3dff78c is described below
commit 3dff78c87ffbfe6575afb4642b54e2cc14cc4aa1
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon May 17 22:03:21 2021 +0800
[ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged
### What is this PR for?
Simple PR to check whether the current latest checkpoint is changed, only update paragraph config when it is changed.
### What type of PR is it?
[Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5374
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4120 from zjffdu/ZEPPELIN-5374 and squashes the following commits:
79f269f24e [Jeff Zhang] [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged
(cherry picked from commit b173590a62144bfa20d4822b99283677edd61c16)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../src/main/java/org/apache/zeppelin/flink/JobManager.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 5f40569..7242b04 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -188,6 +188,7 @@ public class JobManager {
private AtomicBoolean running = new AtomicBoolean(true);
private boolean isFirstPoll = true;
private long checkInterval;
+ private String latestCheckpointPath;
FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context, long checkInterval) {
this.flinkWebUrl = flinkWebUrl;
@@ -253,11 +254,12 @@ public class JobManager {
if (completedObject.has("external_path")) {
String checkpointPath = completedObject.getString("external_path");
LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
- if (!StringUtils.isBlank(checkpointPath)) {
+ if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)) {
Map<String, String> config = new HashMap<>();
config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
context.getIntpEventClient().updateParagraphConfig(
context.getNoteId(), context.getParagraphId(), config);
+ latestCheckpointPath = checkpointPath;
}
}
}