You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:25 UTC
[27/30] flink git commit: [FLINK-7941] Store timestamps indexed by
ExecutionState in SubtasksTimesInfo
[FLINK-7941] Store timestamps indexed by ExecutionState in SubtasksTimesInfo
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26e3d376
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26e3d376
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26e3d376
Branch: refs/heads/master
Commit: 26e3d3765a7bab2f4cb517476dfcb7e9c1b2ae30
Parents: 712d4cf
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 18:59:19 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100
----------------------------------------------------------------------
.../rest/handler/job/SubtasksTimesHandler.java | 4 +--
.../rest/messages/SubtasksTimesInfo.java | 5 ++--
.../rest/messages/SubtasksTimesInfoTest.java | 26 +++++++++++---------
3 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index feae3ab..bc72e51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -89,9 +89,9 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<Subtasks
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
- Map<String, Long> timestampMap = new HashMap<>();
+ Map<ExecutionState, Long> timestampMap = new HashMap<>(ExecutionState.values().length);
for (ExecutionState state : ExecutionState.values()) {
- timestampMap.put(state.name(), timestamps[state.ordinal()]);
+ timestampMap.put(state, timestamps[state.ordinal()]);
}
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(
http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
index d97a0d0..a0edf7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.messages;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -109,13 +110,13 @@ public class SubtasksTimesInfo implements ResponseBody {
private final long duration;
@JsonProperty(FIELD_NAME_TIMESTAMPS)
- private final Map<String, Long> timestamps;
+ private final Map<ExecutionState, Long> timestamps;
public SubtaskTimeInfo(
@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
@JsonProperty(FIELD_NAME_HOST) String host,
@JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) {
+ @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<ExecutionState, Long> timestamps) {
this.subtask = subtask;
this.host = checkNotNull(host);
this.duration = duration;
http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
index 82eb21b..cbe5409 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.rest.messages;
+import org.apache.flink.runtime.execution.ExecutionState;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,22 +39,22 @@ public class SubtasksTimesInfoTest extends RestResponseMarshallingTestBase<Subta
protected SubtasksTimesInfo getTestResponseInstance() throws Exception {
List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();
- Map<String, Long> subTimeMap1 = new HashMap<>();
- subTimeMap1.put("state11", System.currentTimeMillis());
- subTimeMap1.put("state12", System.currentTimeMillis());
- subTimeMap1.put("state13", System.currentTimeMillis());
+ Map<ExecutionState, Long> subTimeMap1 = new HashMap<>();
+ subTimeMap1.put(ExecutionState.RUNNING, 1L);
+ subTimeMap1.put(ExecutionState.FAILED, 2L);
+ subTimeMap1.put(ExecutionState.CANCELED, 3L);
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1));
- Map<String, Long> subTimeMap2 = new HashMap<>();
- subTimeMap1.put("state21", System.currentTimeMillis());
- subTimeMap1.put("state22", System.currentTimeMillis());
- subTimeMap1.put("state23", System.currentTimeMillis());
+ Map<ExecutionState, Long> subTimeMap2 = new HashMap<>();
+ subTimeMap2.put(ExecutionState.RUNNING, 4L);
+ subTimeMap2.put(ExecutionState.FAILED, 5L);
+ subTimeMap2.put(ExecutionState.CANCELED, 6L);
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2));
- Map<String, Long> subTimeMap3 = new HashMap<>();
- subTimeMap1.put("state31", System.currentTimeMillis());
- subTimeMap1.put("state32", System.currentTimeMillis());
- subTimeMap1.put("state33", System.currentTimeMillis());
+ Map<ExecutionState, Long> subTimeMap3 = new HashMap<>();
+ subTimeMap3.put(ExecutionState.SCHEDULED, 1L);
+ subTimeMap3.put(ExecutionState.FAILED, 2L);
+ subTimeMap3.put(ExecutionState.CANCELING, 3L);
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3));
return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks);