You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/07/01 03:35:31 UTC
[flink] 03/03: [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status
This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb8e1d14f05aca186ec874437eba3d44fbb3bd97
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue May 24 11:25:17 2022 +0800
[FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status
This closes #20111.
---
.../shortcodes/generated/rest_v1_dispatcher.html | 18 +++++
docs/static/generated/rest_v1_dispatcher.yml | 5 ++
.../src/test/resources/rest_api_v1.snapshot | 18 +++++
.../runtime/executiongraph/AccessExecution.java | 15 +++++
.../runtime/executiongraph/ArchivedExecution.java | 19 +++++-
.../flink/runtime/executiongraph/Execution.java | 29 +++++++-
.../job/SubtaskExecutionAttemptDetailsInfo.java | 77 ++++++++++++++++++++--
.../executiongraph/ArchivedExecutionGraphTest.java | 30 +++++++++
.../executiongraph/ExecutionHistoryTest.java | 1 +
.../rest/handler/job/JobExceptionsHandlerTest.java | 4 +-
.../SubtaskCurrentAttemptDetailsHandlerTest.java | 16 ++++-
...askExecutionAttemptAccumulatorsHandlerTest.java | 1 +
.../SubtaskExecutionAttemptDetailsHandlerTest.java | 12 +++-
.../rest/messages/JobVertexDetailsInfoTest.java | 10 ++-
.../SubtaskExecutionAttemptDetailsInfoTest.java | 12 +++-
.../exceptionhistory/TestingAccessExecution.java | 10 +++
16 files changed, 260 insertions(+), 17 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4b648e77e72..3f56fcef74b 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3712,6 +3712,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
+ "status-duration" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
"subtask" : {
"type" : "integer"
},
@@ -4406,6 +4412,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
+ "status-duration" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
"subtask" : {
"type" : "integer"
},
@@ -4543,6 +4555,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
+ "status-duration" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
"subtask" : {
"type" : "integer"
},
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 259d5b6d9ba..4b85119cd68 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1687,6 +1687,11 @@ components:
$ref: '#/components/schemas/IOMetricsInfo'
taskmanager-id:
type: string
+ status-duration:
+ type: object
+ additionalProperties:
+ type: integer
+ format: int64
JobResult:
type: object
properties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 99cd6fb0cab..30cbe20f7d1 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2191,6 +2191,12 @@
"taskmanager-id" : {
"type" : "string"
},
+ "status-duration" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
"start_time" : {
"type" : "integer"
}
@@ -2561,6 +2567,12 @@
"taskmanager-id" : {
"type" : "string"
},
+ "status-duration" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
"start_time" : {
"type" : "integer"
}
@@ -2656,6 +2668,12 @@
"taskmanager-id" : {
"type" : "string"
},
+ "status-duration" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ },
"start_time" : {
"type" : "integer"
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index b0da913b4dd..c8a5dcf7f00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -47,6 +47,13 @@ public interface AccessExecution {
*/
long[] getStateTimestamps();
+ /**
+ * Returns the end timestamps for every {@link ExecutionState}.
+ *
+ * @return timestamps for each state
+ */
+ long[] getStateEndTimestamps();
+
/**
* Returns the current {@link ExecutionState} for this execution.
*
@@ -79,6 +86,14 @@ public interface AccessExecution {
*/
long getStateTimestamp(ExecutionState state);
+ /**
+ * Returns the end timestamp for the given {@link ExecutionState}.
+ *
+ * @param state state for which the timestamp should be returned
+ * @return timestamp for the given state
+ */
+ long getStateEndTimestamp(ExecutionState state);
+
/**
* Returns the user-defined accumulators as strings.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index a270d072c4d..7cb2d8a39da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -37,6 +37,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
private final long[] stateTimestamps;
+ private final long[] stateEndTimestamps;
+
private final ExecutionState state;
@Nullable private final ErrorInfo failureInfo; // once assigned, never changes
@@ -59,7 +61,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
execution.getFailureInfo().orElse(null),
execution.getAssignedResourceLocation(),
execution.getAssignedAllocationID(),
- execution.getStateTimestamps());
+ execution.getStateTimestamps(),
+ execution.getStateEndTimestamps());
}
public ArchivedExecution(
@@ -70,7 +73,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
@Nullable ErrorInfo failureCause,
TaskManagerLocation assignedResourceLocation,
AllocationID assignedAllocationID,
- long[] stateTimestamps) {
+ long[] stateTimestamps,
+ long[] stateEndTimestamps) {
this.userAccumulators = userAccumulators;
this.ioMetrics = ioMetrics;
this.failureInfo = failureCause;
@@ -78,6 +82,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
this.attemptId = attemptId;
this.state = state;
this.stateTimestamps = stateTimestamps;
+ this.stateEndTimestamps = stateEndTimestamps;
this.assignedAllocationID = assignedAllocationID;
}
@@ -100,6 +105,11 @@ public class ArchivedExecution implements AccessExecution, Serializable {
return stateTimestamps;
}
+ @Override
+ public long[] getStateEndTimestamps() {
+ return stateEndTimestamps;
+ }
+
@Override
public ExecutionState getState() {
return state;
@@ -124,6 +134,11 @@ public class ArchivedExecution implements AccessExecution, Serializable {
return this.stateTimestamps[state.ordinal()];
}
+ @Override
+ public long getStateEndTimestamp(ExecutionState state) {
+ return this.stateEndTimestamps[state.ordinal()];
+ }
+
@Override
public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
return userAccumulators;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 79c2cb9602f..459d9861980 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -133,6 +133,12 @@ public class Execution
*/
private final long[] stateTimestamps;
+ /**
+ * The end timestamps when state transitions occurred, indexed by {@link
+ * ExecutionState#ordinal()}.
+ */
+ private final long[] stateEndTimestamps;
+
private final Time rpcTimeout;
private final Collection<PartitionInfo> partitionInfos;
@@ -211,6 +217,7 @@ public class Execution
this.rpcTimeout = checkNotNull(rpcTimeout);
this.stateTimestamps = new long[ExecutionState.values().length];
+ this.stateEndTimestamps = new long[ExecutionState.values().length];
markTimestamp(CREATED, startTimestamp);
this.partitionInfos = new ArrayList<>(16);
@@ -337,11 +344,21 @@ public class Execution
return stateTimestamps;
}
+ @Override
+ public long[] getStateEndTimestamps() {
+ return stateEndTimestamps;
+ }
+
@Override
public long getStateTimestamp(ExecutionState state) {
return this.stateTimestamps[state.ordinal()];
}
+ @Override
+ public long getStateEndTimestamp(ExecutionState state) {
+ return this.stateEndTimestamps[state.ordinal()];
+ }
+
public boolean isFinished() {
return state.isTerminal();
}
@@ -1405,7 +1422,7 @@ public class Execution
if (state == currentState) {
state = targetState;
- markTimestamp(targetState);
+ markTimestamp(currentState, targetState);
if (error == null) {
LOG.info(
@@ -1454,14 +1471,20 @@ public class Execution
}
}
- private void markTimestamp(ExecutionState state) {
- markTimestamp(state, System.currentTimeMillis());
+ private void markTimestamp(ExecutionState currentState, ExecutionState targetState) {
+ long now = System.currentTimeMillis();
+ markTimestamp(targetState, now);
+ markEndTimestamp(currentState, now);
}
private void markTimestamp(ExecutionState state, long timestamp) {
this.stateTimestamps[state.ordinal()] = timestamp;
}
+ private void markEndTimestamp(ExecutionState state, long timestamp) {
+ this.stateEndTimestamps[state.ordinal()] = timestamp;
+ }
+
public String getVertexWithAttempt() {
return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + getAttemptNumber();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 637538f1f48..a1a3115369c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -36,6 +36,8 @@ import io.swagger.v3.oas.annotations.Hidden;
import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
/** The sub task execution attempt response. */
@@ -61,6 +63,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
public static final String FIELD_NAME_TASKMANAGER_ID = "taskmanager-id";
+ public static final String FIELD_NAME_STATUS_DURATION = "status-duration";
+
@JsonProperty(FIELD_NAME_SUBTASK_INDEX)
private final int subtaskIndex;
@@ -92,6 +96,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_TASKMANAGER_ID)
private final String taskmanagerId;
+ @JsonProperty(FIELD_NAME_STATUS_DURATION)
+ private final Map<ExecutionState, Long> statusDuration;
+
@JsonCreator
public SubtaskExecutionAttemptDetailsInfo(
@JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
@@ -102,7 +109,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo,
- @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId) {
+ @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId,
+ @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) {
this.subtaskIndex = subtaskIndex;
this.status = Preconditions.checkNotNull(status);
@@ -114,6 +122,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
this.duration = duration;
this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId);
+ this.statusDuration = Preconditions.checkNotNull(statusDuration);
}
public int getSubtaskIndex() {
@@ -148,6 +157,14 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
return duration;
}
+ public Map<ExecutionState, Long> getStatusDuration() {
+ return statusDuration;
+ }
+
+ public long getStatusDuration(ExecutionState state) {
+ return statusDuration.get(state);
+ }
+
public IOMetricsInfo getIoMetricsInfo() {
return ioMetricsInfo;
}
@@ -176,7 +193,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
&& endTime == that.endTime
&& duration == that.duration
&& Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
- && Objects.equals(taskmanagerId, that.taskmanagerId);
+ && Objects.equals(taskmanagerId, that.taskmanagerId)
+ && Objects.equals(statusDuration, that.statusDuration);
}
@Override
@@ -191,7 +209,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
endTime,
duration,
ioMetricsInfo,
- taskmanagerId);
+ taskmanagerId,
+ statusDuration);
}
public static SubtaskExecutionAttemptDetailsInfo create(
@@ -240,6 +259,56 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
endTime,
duration,
ioMetricsInfo,
- taskmanagerId);
+ taskmanagerId,
+ getExecutionStateDuration(execution));
+ }
+
+ private static Map<ExecutionState, Long> getExecutionStateDuration(AccessExecution execution) {
+ Map<ExecutionState, Long> executionStateDuration = new HashMap<>();
+ long now = System.currentTimeMillis();
+ ExecutionState state = execution.getState();
+ executionStateDuration.put(
+ ExecutionState.CREATED,
+ calculateStateDuration(
+ execution.getStateTimestamp(ExecutionState.CREATED),
+ state == ExecutionState.CREATED
+ ? now
+ : execution.getStateEndTimestamp(ExecutionState.CREATED)));
+ executionStateDuration.put(
+ ExecutionState.SCHEDULED,
+ calculateStateDuration(
+ execution.getStateTimestamp(ExecutionState.SCHEDULED),
+ state == ExecutionState.SCHEDULED
+ ? now
+ : execution.getStateEndTimestamp(ExecutionState.SCHEDULED)));
+ executionStateDuration.put(
+ ExecutionState.DEPLOYING,
+ calculateStateDuration(
+ execution.getStateTimestamp(ExecutionState.DEPLOYING),
+ state == ExecutionState.DEPLOYING
+ ? now
+ : execution.getStateEndTimestamp(ExecutionState.DEPLOYING)));
+ executionStateDuration.put(
+ ExecutionState.INITIALIZING,
+ calculateStateDuration(
+ execution.getStateTimestamp(ExecutionState.INITIALIZING),
+ state == ExecutionState.INITIALIZING
+ ? now
+ : execution.getStateEndTimestamp(ExecutionState.INITIALIZING)));
+ executionStateDuration.put(
+ ExecutionState.RUNNING,
+ calculateStateDuration(
+ execution.getStateTimestamp(ExecutionState.RUNNING),
+ state == ExecutionState.RUNNING
+ ? now
+ : execution.getStateEndTimestamp(ExecutionState.RUNNING)));
+ return executionStateDuration;
+ }
+
+ private static long calculateStateDuration(long start, long end) {
+ if (start == 0 || end == 0) {
+ return -1;
+ }
+ return end - start;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 6819ef941c7..b6e9cefc2b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -435,6 +435,9 @@ public class ArchivedExecutionGraphTest extends TestLogger {
assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber());
assertArrayEquals(
runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps());
+ assertArrayEquals(
+ runtimeExecution.getStateEndTimestamps(),
+ archivedExecution.getStateEndTimestamps());
assertEquals(runtimeExecution.getState(), archivedExecution.getState());
assertEquals(
runtimeExecution.getAssignedResourceLocation(),
@@ -472,6 +475,33 @@ public class ArchivedExecutionGraphTest extends TestLogger {
assertEquals(
runtimeExecution.getStateTimestamp(ExecutionState.FAILED),
archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED),
+ archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED),
+ archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING),
+ archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING),
+ archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING),
+ archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED),
+ archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING),
+ archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED),
+ archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
+ assertEquals(
+ runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED),
+ archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
compareStringifiedAccumulators(
runtimeExecution.getUserAccumulatorsStringified(),
archivedExecution.getUserAccumulatorsStringified());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
index c719fecf860..3f1c54b1238 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
@@ -80,6 +80,7 @@ class ExecutionHistoryTest {
null,
null,
null,
+ new long[ExecutionState.values().length],
new long[ExecutionState.values().length]);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index b5eab9a20a2..93d3cf9c99d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -330,6 +330,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
final StringifiedAccumulatorResult[] emptyAccumulators =
new StringifiedAccumulatorResult[0];
final long[] timestamps = new long[ExecutionState.values().length];
+ final long[] endTimestamps = new long[ExecutionState.values().length];
final ExecutionState expectedState = ExecutionState.RUNNING;
final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
@@ -352,7 +353,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
System.currentTimeMillis()),
assignedResourceLocation,
allocationID,
- timestamps),
+ timestamps,
+ endTimestamps),
new ExecutionHistory(0))
},
jobVertexID,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 483ad2facf1..4c267329a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -50,6 +50,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;
@@ -87,7 +88,9 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
accumulateBackPressuredTime);
final long[] timestamps = new long[ExecutionState.values().length];
+ final long[] endTimestamps = new long[ExecutionState.values().length];
timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
+ endTimestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs + 10;
final ExecutionState expectedState = ExecutionState.FINISHED;
timestamps[expectedState.ordinal()] = finishedTs;
@@ -106,7 +109,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
null,
assignedResourceLocation,
allocationID,
- timestamps);
+ timestamps,
+ endTimestamps);
final ArchivedExecutionVertex executionVertex =
new ArchivedExecutionVertex(
@@ -170,6 +174,13 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
accumulateIdleTime,
accumulateBusyTime);
+ final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+ statusDuration.put(ExecutionState.CREATED, -1L);
+ statusDuration.put(ExecutionState.SCHEDULED, -1L);
+ statusDuration.put(ExecutionState.DEPLOYING, 10L);
+ statusDuration.put(ExecutionState.INITIALIZING, -1L);
+ statusDuration.put(ExecutionState.RUNNING, -1L);
+
final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
new SubtaskExecutionAttemptDetailsInfo(
subtaskIndex,
@@ -180,7 +191,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
finishedTs,
finishedTs - deployingTs,
ioMetricsInfo,
- assignedResourceLocation.getResourceID().getResourceIdString());
+ assignedResourceLocation.getResourceID().getResourceIdString(),
+ statusDuration);
assertEquals(expectedDetailsInfo, detailsInfo);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index e5d98dbf8f4..bf1b9561e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -99,6 +99,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
null,
null,
null,
+ new long[ExecutionState.values().length],
new long[ExecutionState.values().length]);
// Invoke tested method.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 4a288e738be..a044f3f87ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.junit.Assert.assertEquals;
@@ -107,6 +108,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
null,
null,
null,
+ new long[ExecutionState.values().length],
new long[ExecutionState.values().length]),
new ExecutionHistory(0))
},
@@ -175,6 +177,13 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
accumulateIdleTime,
accumulateBusyTime);
+ final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+ statusDuration.put(ExecutionState.CREATED, -1L);
+ statusDuration.put(ExecutionState.SCHEDULED, -1L);
+ statusDuration.put(ExecutionState.DEPLOYING, -1L);
+ statusDuration.put(ExecutionState.INITIALIZING, -1L);
+ statusDuration.put(ExecutionState.RUNNING, -1L);
+
final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
new SubtaskExecutionAttemptDetailsInfo(
subtaskIndex,
@@ -185,7 +194,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
0L,
-1L,
ioMetricsInfo,
- "(unassigned)");
+ "(unassigned)",
+ statusDuration);
assertEquals(expectedDetailsInfo, detailsInfo);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 806ce560494..50c0865f85c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -62,7 +63,8 @@ public class JobVertexDetailsInfoTest
System.currentTimeMillis(),
1L,
jobVertexMetrics,
- "taskmanagerId1"));
+ "taskmanagerId1",
+ Collections.singletonMap(ExecutionState.CREATED, 10L)));
vertexTaskDetailList.add(
new SubtaskExecutionAttemptDetailsInfo(
1,
@@ -73,7 +75,8 @@ public class JobVertexDetailsInfoTest
System.currentTimeMillis(),
1L,
jobVertexMetrics,
- "taskmanagerId2"));
+ "taskmanagerId2",
+ Collections.singletonMap(ExecutionState.CREATED, 10L)));
vertexTaskDetailList.add(
new SubtaskExecutionAttemptDetailsInfo(
2,
@@ -84,7 +87,8 @@ public class JobVertexDetailsInfoTest
System.currentTimeMillis(),
1L,
jobVertexMetrics,
- "taskmanagerId3"));
+ "taskmanagerId3",
+ Collections.singletonMap(ExecutionState.CREATED, 10L)));
int parallelism = 1 + (random.nextInt() / 3);
return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index f022d6ced26..48158c05b56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
/** Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}. */
@@ -51,6 +53,13 @@ public class SubtaskExecutionAttemptDetailsInfoTest
Math.abs(random.nextLong()),
Math.abs(random.nextDouble()));
+ final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+ statusDuration.put(ExecutionState.CREATED, 10L);
+ statusDuration.put(ExecutionState.SCHEDULED, 20L);
+ statusDuration.put(ExecutionState.DEPLOYING, 30L);
+ statusDuration.put(ExecutionState.INITIALIZING, 40L);
+ statusDuration.put(ExecutionState.RUNNING, 50L);
+
return new SubtaskExecutionAttemptDetailsInfo(
Math.abs(random.nextInt()),
ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
@@ -60,6 +69,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest
Math.abs(random.nextLong()),
Math.abs(random.nextLong()),
ioMetricsInfo,
- "taskmanagerId");
+ "taskmanagerId",
+ statusDuration);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
index f3ec6fe2f1d..5173ba7c2d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
@@ -77,6 +77,11 @@ public class TestingAccessExecution implements AccessExecution {
throw new UnsupportedOperationException("getStateTimestamps should not be called.");
}
+ @Override
+ public long[] getStateEndTimestamps() {
+ throw new UnsupportedOperationException("getStateTimestamps should not be called.");
+ }
+
@Override
public ExecutionState getState() {
return state;
@@ -87,6 +92,11 @@ public class TestingAccessExecution implements AccessExecution {
throw new UnsupportedOperationException("getStateTimestamp should not be called.");
}
+ @Override
+ public long getStateEndTimestamp(ExecutionState state) {
+ throw new UnsupportedOperationException("getStateTimestamp should not be called.");
+ }
+
@Override
public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
throw new UnsupportedOperationException(