You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:25 UTC

[27/30] flink git commit: [FLINK-7941] Store timestamps indexed by ExecutionState in SubtasksTimesInfo

[FLINK-7941] Store timestamps indexed by ExecutionState in SubtasksTimesInfo


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26e3d376
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26e3d376
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26e3d376

Branch: refs/heads/master
Commit: 26e3d3765a7bab2f4cb517476dfcb7e9c1b2ae30
Parents: 712d4cf
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 18:59:19 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100

----------------------------------------------------------------------
 .../rest/handler/job/SubtasksTimesHandler.java  |  4 +--
 .../rest/messages/SubtasksTimesInfo.java        |  5 ++--
 .../rest/messages/SubtasksTimesInfoTest.java    | 26 +++++++++++---------
 3 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index feae3ab..bc72e51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -89,9 +89,9 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<Subtasks
 			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String locationString = location == null ? "(unassigned)" : location.getHostname();
 
-			Map<String, Long> timestampMap = new HashMap<>();
+			Map<ExecutionState, Long> timestampMap = new HashMap<>(ExecutionState.values().length);
 			for (ExecutionState state : ExecutionState.values()) {
-				timestampMap.put(state.name(), timestamps[state.ordinal()]);
+				timestampMap.put(state, timestamps[state.ordinal()]);
 			}
 
 			subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(

http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
index d97a0d0..a0edf7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -109,13 +110,13 @@ public class SubtasksTimesInfo implements ResponseBody {
 		private final long duration;
 
 		@JsonProperty(FIELD_NAME_TIMESTAMPS)
-		private final Map<String, Long> timestamps;
+		private final Map<ExecutionState, Long> timestamps;
 
 		public SubtaskTimeInfo(
 				@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
 				@JsonProperty(FIELD_NAME_HOST) String host,
 				@JsonProperty(FIELD_NAME_DURATION) long duration,
-				@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) {
+				@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<ExecutionState, Long> timestamps) {
 			this.subtask = subtask;
 			this.host = checkNotNull(host);
 			this.duration = duration;

http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
index 82eb21b..cbe5409 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.runtime.execution.ExecutionState;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,22 +39,22 @@ public class SubtasksTimesInfoTest extends RestResponseMarshallingTestBase<Subta
 	protected SubtasksTimesInfo getTestResponseInstance() throws Exception {
 		List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();
 
-		Map<String, Long> subTimeMap1 = new HashMap<>();
-		subTimeMap1.put("state11", System.currentTimeMillis());
-		subTimeMap1.put("state12", System.currentTimeMillis());
-		subTimeMap1.put("state13", System.currentTimeMillis());
+		Map<ExecutionState, Long> subTimeMap1 = new HashMap<>();
+		subTimeMap1.put(ExecutionState.RUNNING, 1L);
+		subTimeMap1.put(ExecutionState.FAILED, 2L);
+		subTimeMap1.put(ExecutionState.CANCELED, 3L);
 		subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1));
 
-		Map<String, Long> subTimeMap2 = new HashMap<>();
-		subTimeMap1.put("state21", System.currentTimeMillis());
-		subTimeMap1.put("state22", System.currentTimeMillis());
-		subTimeMap1.put("state23", System.currentTimeMillis());
+		Map<ExecutionState, Long> subTimeMap2 = new HashMap<>();
+		subTimeMap2.put(ExecutionState.RUNNING, 4L);
+		subTimeMap2.put(ExecutionState.FAILED, 5L);
+		subTimeMap2.put(ExecutionState.CANCELED, 6L);
 		subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2));
 
-		Map<String, Long> subTimeMap3 = new HashMap<>();
-		subTimeMap1.put("state31", System.currentTimeMillis());
-		subTimeMap1.put("state32", System.currentTimeMillis());
-		subTimeMap1.put("state33", System.currentTimeMillis());
+		Map<ExecutionState, Long> subTimeMap3 = new HashMap<>();
+		subTimeMap3.put(ExecutionState.SCHEDULED, 1L);
+		subTimeMap3.put(ExecutionState.FAILED, 2L);
+		subTimeMap3.put(ExecutionState.CANCELING, 3L);
 		subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3));
 
 		return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks);