You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/04 12:44:21 UTC

[3/7] flink git commit: [FLINK-3709] Prevent caching of outdated suspended ExecutionGraphs

[FLINK-3709] Prevent caching of outdated suspended ExecutionGraphs

This closes #3709.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49d0e432
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49d0e432
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49d0e432

Branch: refs/heads/master
Commit: 49d0e4321df1853d00e26593556c34acb9bed3d2
Parents: 6aa5bad
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Apr 11 19:48:52 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/ExecutionGraphHolder.java      | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49d0e432/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 3d0cfc0..f9faa85 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
 import org.slf4j.Logger;
@@ -67,14 +68,18 @@ public class ExecutionGraphHolder {
 	public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
 		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
-			return cached;
+			if (cached.getState() == JobStatus.SUSPENDED) {
+				cache.remove(jid);
+			} else {
+				return cached;
+			}
 		}
 
 		try {
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
 				Object result = Await.result(future, timeout);
-				
+
 				if (result instanceof JobManagerMessages.JobNotFound) {
 					return null;
 				}