You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/04 12:44:21 UTC
[3/7] flink git commit: [FLINK-3709] Prevent caching of outdated
suspended ExecutionGraphs
[FLINK-3709] Prevent caching of outdated suspended ExecutionGraphs
This closes #3709.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49d0e432
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49d0e432
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49d0e432
Branch: refs/heads/master
Commit: 49d0e4321df1853d00e26593556c34acb9bed3d2
Parents: 6aa5bad
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Apr 11 19:48:52 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/webmonitor/ExecutionGraphHolder.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/49d0e432/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 3d0cfc0..f9faa85 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.slf4j.Logger;
@@ -67,14 +68,18 @@ public class ExecutionGraphHolder {
public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
- return cached;
+ if (cached.getState() == JobStatus.SUSPENDED) {
+ cache.remove(jid);
+ } else {
+ return cached;
+ }
}
try {
if (jobManager != null) {
Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
Object result = Await.result(future, timeout);
-
+
if (result instanceof JobManagerMessages.JobNotFound) {
return null;
}