You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:58 UTC
[flink] 05/06: [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a48fd53bd317ac2102adb00c1209350b57a687e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:54:10 2022 +0800
[FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.
---
.../history/HistoryServerArchiveFetcher.java | 3 +-
.../runtime/messages/webmonitor/JobDetails.java | 98 ++++++++++-
.../metrics/dump/MetricDumpSerialization.java | 10 +-
.../flink/runtime/metrics/dump/QueryScopeInfo.java | 28 +++-
.../groups/InternalOperatorMetricGroup.java | 1 +
.../runtime/metrics/groups/TaskMetricGroup.java | 5 +-
.../handler/legacy/metrics/MetricFetcherImpl.java | 1 +
.../rest/handler/legacy/metrics/MetricStore.java | 186 +++++++++++++++++----
.../rest/handler/util/MutableIOMetrics.java | 7 +-
.../messages/webmonitor/JobDetailsTest.java | 31 ++++
.../metrics/dump/MetricDumpSerializerTest.java | 5 +-
.../runtime/metrics/dump/QueryScopeInfoTest.java | 10 +-
.../job/JobVertexBackPressureHandlerTest.java | 7 +-
.../AggregatingSubtasksMetricsHandlerTest.java | 6 +-
.../job/metrics/JobVertexMetricsHandlerTest.java | 4 +-
.../job/metrics/SubtaskMetricsHandlerTest.java | 4 +-
.../handler/legacy/metrics/MetricFetcherTest.java | 5 +-
.../handler/legacy/metrics/MetricStoreTest.java | 61 ++++++-
18 files changed, 408 insertions(+), 64 deletions(-)
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index eb5a34b2c2d..8e5aee9c633 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -425,7 +425,8 @@ class HistoryServerArchiveFetcher {
state,
lastMod,
tasksPerState,
- numTasks);
+ numTasks,
+ new HashMap<>());
MultipleJobsDetails multipleJobsDetails =
new MultipleJobsDetails(Collections.singleton(jobDetails));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index f9b609ac1cf..74b2964228c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.messages.webmonitor;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -39,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,6 +60,8 @@ public class JobDetails implements Serializable {
private static final String FIELD_NAME_STATUS = "state";
private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification";
private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
+ private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS =
+ "current-execution-attempts";
private final JobID jobId;
@@ -76,6 +81,15 @@ public class JobDetails implements Serializable {
private final int numTasks;
+ /**
+ * The map holds the attempt number of the current execution attempt in the Execution, which is
+ * considered as the representing execution for the subtask of the vertex. The keys and values
+ * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is used to accumulate
+ * the metrics of a subtask in MetricFetcher.
+ */
+ private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+
+ @VisibleForTesting
public JobDetails(
JobID jobId,
String jobName,
@@ -86,7 +100,30 @@ public class JobDetails implements Serializable {
long lastUpdateTime,
int[] tasksPerState,
int numTasks) {
+ this(
+ jobId,
+ jobName,
+ startTime,
+ endTime,
+ duration,
+ status,
+ lastUpdateTime,
+ tasksPerState,
+ numTasks,
+ new HashMap<>());
+ }
+ public JobDetails(
+ JobID jobId,
+ String jobName,
+ long startTime,
+ long endTime,
+ long duration,
+ JobStatus status,
+ long lastUpdateTime,
+ int[] tasksPerState,
+ int numTasks,
+ Map<String, Map<Integer, Integer>> currentExecutionAttempts) {
this.jobId = checkNotNull(jobId);
this.jobName = checkNotNull(jobName);
this.startTime = startTime;
@@ -100,6 +137,7 @@ public class JobDetails implements Serializable {
ExecutionState.values().length);
this.tasksPerState = checkNotNull(tasksPerState);
this.numTasks = numTasks;
+ this.currentExecutionAttempts = checkNotNull(currentExecutionAttempts);
}
public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
@@ -112,16 +150,27 @@ public class JobDetails implements Serializable {
int[] countsPerStatus = new int[ExecutionState.values().length];
long lastChanged = 0;
int numTotalTasks = 0;
+ Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
AccessExecutionVertex[] taskVertices = ejv.getTaskVertices();
numTotalTasks += taskVertices.length;
+ Map<Integer, Integer> vertexAttempts = new HashMap<>();
for (AccessExecutionVertex taskVertex : taskVertices) {
+ if (taskVertex.getCurrentExecutions().size() > 1) {
+ vertexAttempts.put(
+ taskVertex.getParallelSubtaskIndex(),
+ taskVertex.getCurrentExecutionAttempt().getAttemptNumber());
+ }
ExecutionState state = taskVertex.getExecutionState();
countsPerStatus[state.ordinal()]++;
lastChanged = Math.max(lastChanged, taskVertex.getStateTimestamp(state));
}
+
+ if (!vertexAttempts.isEmpty()) {
+ currentExecutionAttempts.put(String.valueOf(ejv.getJobVertexId()), vertexAttempts);
+ }
}
lastChanged = Math.max(lastChanged, finished);
@@ -135,7 +184,8 @@ public class JobDetails implements Serializable {
status,
lastChanged,
countsPerStatus,
- numTotalTasks);
+ numTotalTasks,
+ currentExecutionAttempts);
}
// ------------------------------------------------------------------------
@@ -176,6 +226,9 @@ public class JobDetails implements Serializable {
return tasksPerState;
}
+ public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() {
+ return currentExecutionAttempts;
+ }
// ------------------------------------------------------------------------
@Override
@@ -192,7 +245,8 @@ public class JobDetails implements Serializable {
&& this.status == that.status
&& this.jobId.equals(that.jobId)
&& this.jobName.equals(that.jobName)
- && Arrays.equals(this.tasksPerState, that.tasksPerState);
+ && Arrays.equals(this.tasksPerState, that.tasksPerState)
+ && this.currentExecutionAttempts.equals(that.currentExecutionAttempts);
} else {
return false;
}
@@ -208,6 +262,7 @@ public class JobDetails implements Serializable {
result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32));
result = 31 * result + Arrays.hashCode(tasksPerState);
result = 31 * result + numTasks;
+ result = 31 * result + currentExecutionAttempts.hashCode();
return result;
}
@@ -271,6 +326,20 @@ public class JobDetails implements Serializable {
jsonGenerator.writeEndObject();
+ if (!jobDetails.currentExecutionAttempts.isEmpty()) {
+ jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
+ for (Map.Entry<String, Map<Integer, Integer>> vertex :
+ jobDetails.currentExecutionAttempts.entrySet()) {
+ jsonGenerator.writeObjectFieldStart(vertex.getKey());
+ for (Map.Entry<Integer, Integer> attempt : vertex.getValue().entrySet()) {
+ jsonGenerator.writeNumberField(
+ String.valueOf(attempt.getKey()), attempt.getValue());
+ }
+ jsonGenerator.writeEndObject();
+ }
+ jsonGenerator.writeEndObject();
+ }
+
jsonGenerator.writeEndObject();
}
}
@@ -310,6 +379,28 @@ public class JobDetails implements Serializable {
jsonNode == null ? 0 : jsonNode.intValue();
}
+ Map<String, Map<Integer, Integer>> attempts = new HashMap<>();
+ JsonNode attemptsNode = rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
+ if (attemptsNode != null) {
+ attemptsNode
+ .fields()
+ .forEachRemaining(
+ vertex -> {
+ String vertexId = vertex.getKey();
+ Map<Integer, Integer> vertexAttempts =
+ attempts.computeIfAbsent(
+ vertexId, k -> new HashMap<>());
+ vertex.getValue()
+ .fields()
+ .forEachRemaining(
+ attempt ->
+ vertexAttempts.put(
+ Integer.parseInt(
+ attempt.getKey()),
+ attempt.getValue().intValue()));
+ });
+ }
+
return new JobDetails(
jobId,
jobName,
@@ -319,7 +410,8 @@ public class JobDetails implements Serializable {
jobStatus,
lastUpdateTime,
numVerticesPerExecutionState,
- numTasks);
+ numTasks,
+ attempts);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index de423dadf9b..90187fed27f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -240,6 +240,7 @@ public class MetricDumpSerialization {
out.writeUTF(taskInfo.jobID);
out.writeUTF(taskInfo.vertexID);
out.writeInt(taskInfo.subtaskIndex);
+ out.writeInt(taskInfo.attemptNumber);
break;
case INFO_CATEGORY_OPERATOR:
QueryScopeInfo.OperatorQueryScopeInfo operatorInfo =
@@ -247,6 +248,7 @@ public class MetricDumpSerialization {
out.writeUTF(operatorInfo.jobID);
out.writeUTF(operatorInfo.vertexID);
out.writeInt(operatorInfo.subtaskIndex);
+ out.writeInt(operatorInfo.attemptNumber);
out.writeUTF(operatorInfo.operatorName);
break;
default:
@@ -436,6 +438,7 @@ public class MetricDumpSerialization {
String jobID;
String vertexID;
int subtaskIndex;
+ int attemptNumber;
String scope = dis.readUTF();
byte cat = dis.readByte();
@@ -452,14 +455,17 @@ public class MetricDumpSerialization {
jobID = dis.readUTF();
vertexID = dis.readUTF();
subtaskIndex = dis.readInt();
- return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
+ attemptNumber = dis.readInt();
+ return new QueryScopeInfo.TaskQueryScopeInfo(
+ jobID, vertexID, subtaskIndex, attemptNumber, scope);
case INFO_CATEGORY_OPERATOR:
jobID = dis.readUTF();
vertexID = dis.readUTF();
subtaskIndex = dis.readInt();
+ attemptNumber = dis.readInt();
String operatorName = dis.readUTF();
return new QueryScopeInfo.OperatorQueryScopeInfo(
- jobID, vertexID, subtaskIndex, operatorName, scope);
+ jobID, vertexID, subtaskIndex, attemptNumber, operatorName, scope);
default:
throw new IOException("Unknown scope category: " + cat);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index c61a9d76412..d0d9652f627 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -142,22 +142,30 @@ public abstract class QueryScopeInfo {
public final String jobID;
public final String vertexID;
public final int subtaskIndex;
+ public final int attemptNumber;
- public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {
- this(jobID, vertexid, subtaskIndex, "");
+ public TaskQueryScopeInfo(
+ String jobID, String vertexid, int subtaskIndex, int attemptNumber) {
+ this(jobID, vertexid, subtaskIndex, attemptNumber, "");
}
- public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) {
+ public TaskQueryScopeInfo(
+ String jobID, String vertexid, int subtaskIndex, int attemptNumber, String scope) {
super(scope);
this.jobID = jobID;
this.vertexID = vertexid;
this.subtaskIndex = subtaskIndex;
+ this.attemptNumber = attemptNumber;
}
@Override
public TaskQueryScopeInfo copy(String additionalScope) {
return new TaskQueryScopeInfo(
- this.jobID, this.vertexID, this.subtaskIndex, concatScopes(additionalScope));
+ this.jobID,
+ this.vertexID,
+ this.subtaskIndex,
+ this.attemptNumber,
+ concatScopes(additionalScope));
}
@Override
@@ -174,23 +182,30 @@ public abstract class QueryScopeInfo {
public final String jobID;
public final String vertexID;
public final int subtaskIndex;
+ public final int attemptNumber;
public final String operatorName;
public OperatorQueryScopeInfo(
- String jobID, String vertexid, int subtaskIndex, String operatorName) {
- this(jobID, vertexid, subtaskIndex, operatorName, "");
+ String jobID,
+ String vertexid,
+ int subtaskIndex,
+ int attemptNumber,
+ String operatorName) {
+ this(jobID, vertexid, subtaskIndex, attemptNumber, operatorName, "");
}
public OperatorQueryScopeInfo(
String jobID,
String vertexid,
int subtaskIndex,
+ int attemptNumber,
String operatorName,
String scope) {
super(scope);
this.jobID = jobID;
this.vertexID = vertexid;
this.subtaskIndex = subtaskIndex;
+ this.attemptNumber = attemptNumber;
this.operatorName = operatorName;
}
@@ -200,6 +215,7 @@ public abstract class QueryScopeInfo {
this.jobID,
this.vertexID,
this.subtaskIndex,
+ this.attemptNumber,
this.operatorName,
concatScopes(additionalScope));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
index 68a85ba5f2f..d075675b80c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
@@ -75,6 +75,7 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric
this.parent.parent.jobId.toString(),
this.parent.vertexId.toString(),
this.parent.subtaskIndex,
+ this.parent.attemptNumber(),
filter.filterCharacters(this.operatorName));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 0dc0ff9b6c5..afcbbaa44bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,7 +134,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(
CharacterFilter filter) {
return new QueryScopeInfo.TaskQueryScopeInfo(
- this.parent.jobId.toString(), String.valueOf(this.vertexId), this.subtaskIndex);
+ this.parent.jobId.toString(),
+ String.valueOf(this.vertexId),
+ this.subtaskIndex,
+ this.attemptNumber);
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index beb652f97c7..65d2e162532 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -143,6 +143,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
toRetain.add(job.getJobId().toString());
}
metrics.retainJobs(toRetain);
+ metrics.updateCurrentExecutionAttempts(jobDetails.getJobs());
}
},
executor);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 93f289fa036..c86ac763f9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -27,8 +28,10 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -54,6 +57,15 @@ public class MetricStore {
private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
+ /**
+ * The map holds the attempt number of the representing execution for each subtask of each
+ * vertex. The keys and values are JobID -> JobVertexID -> SubtaskIndex ->
+ * CurrentExecutionAttemptNumber. When a metric of an execution attempt is added, the metric can
+ * also be added to the SubtaskMetricStore when it is of the representing execution.
+ */
+ private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts =
+ new ConcurrentHashMap<>();
+
/**
* Remove inactive task managers.
*
@@ -70,6 +82,18 @@ public class MetricStore {
*/
synchronized void retainJobs(List<String> activeJobs) {
jobs.keySet().retainAll(activeJobs);
+ currentExecutionAttempts.keySet().retainAll(activeJobs);
+ }
+
+ public synchronized void updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
+ jobs.forEach(
+ job ->
+ currentExecutionAttempts.put(
+ job.getJobId().toString(), job.getCurrentExecutionAttempts()));
+ }
+
+ public Map<String, Map<String, Map<Integer, Integer>>> getCurrentExecutionAttempts() {
+ return currentExecutionAttempts;
}
/**
@@ -153,7 +177,24 @@ public class MetricStore {
if (task == null) {
return null;
}
- return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+ return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+ }
+
+ public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+ String jobID, String taskID, int subtaskIndex, int attemptNumber) {
+ JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+ if (job == null) {
+ return null;
+ }
+ TaskMetricStore task = job.getTaskMetricStore(taskID);
+ if (task == null) {
+ return null;
+ }
+ SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
+ if (subtask == null) {
+ return null;
+ }
+ return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
}
public synchronized Map<String, JobMetricStore> getJobs() {
@@ -177,7 +218,9 @@ public class MetricStore {
TaskManagerMetricStore tm;
JobMetricStore job;
TaskMetricStore task;
- ComponentMetricStore subtask;
+ SubtaskMetricStore subtask;
+ ComponentMetricStore attempt;
+ boolean isRepresentativeAttempt;
String name = info.scope.isEmpty() ? metric.name : info.scope + "." + metric.name;
@@ -214,15 +257,34 @@ public class MetricStore {
task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
subtask =
task.subtasks.computeIfAbsent(
- taskInfo.subtaskIndex, k -> new ComponentMetricStore());
- /**
- * The duplication is intended. Metrics scoped by subtask are useful for several
- * job/task handlers, while the WebInterface task metric queries currently do
- * not account for subtasks, so we don't divide by subtask and instead use the
- * concatenation of subtask index and metric name as the name for those.
- */
- addMetric(subtask.metrics, name, metric);
- addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+ taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+ // The attempt is the representative one if the current execution attempt
+ // number for the subtask is not present in the currentExecutionAttempts,
+ // which means there should be only one execution
+ isRepresentativeAttempt =
+ isRepresentativeAttempt(
+ taskInfo.jobID,
+ taskInfo.vertexID,
+ taskInfo.subtaskIndex,
+ taskInfo.attemptNumber);
+ attempt =
+ subtask.attempts.computeIfAbsent(
+ taskInfo.attemptNumber, k -> new ComponentMetricStore());
+ addMetric(attempt.metrics, name, metric);
+ // If the attempt is representative one, its metrics can be updated to the
+ // subtask and task metric store.
+ if (isRepresentativeAttempt) {
+ /**
+ * The duplication is intended. Metrics scoped by subtask are useful for
+ * several job/task handlers, while the WebInterface task metric queries
+ * currently do not account for subtasks, so we don't divide by subtask and
+ * instead use the concatenation of subtask index and metric name as the
+ * name for those.
+ */
+ addMetric(subtask.metrics, name, metric);
+ addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+ }
break;
case INFO_CATEGORY_OPERATOR:
QueryScopeInfo.OperatorQueryScopeInfo operatorInfo =
@@ -233,21 +295,38 @@ public class MetricStore {
operatorInfo.vertexID, k -> new TaskMetricStore());
subtask =
task.subtasks.computeIfAbsent(
- operatorInfo.subtaskIndex, k -> new ComponentMetricStore());
- /**
- * As the WebInterface does not account for operators (because it can't) we
- * don't divide by operator and instead use the concatenation of subtask index,
- * operator name and metric name as the name.
- */
- addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric);
- addMetric(
- task.metrics,
- operatorInfo.subtaskIndex
- + "."
- + operatorInfo.operatorName
- + "."
- + name,
- metric);
+ operatorInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+ isRepresentativeAttempt =
+ isRepresentativeAttempt(
+ operatorInfo.jobID,
+ operatorInfo.vertexID,
+ operatorInfo.subtaskIndex,
+ operatorInfo.attemptNumber);
+
+ attempt =
+ subtask.attempts.computeIfAbsent(
+ operatorInfo.attemptNumber, k -> new ComponentMetricStore());
+ addMetric(attempt.metrics, operatorInfo.operatorName + "." + name, metric);
+
+ // If the attempt is representative one, its metrics can be updated to the
+ // subtask and task metric store.
+ if (isRepresentativeAttempt) {
+ /**
+ * As the WebInterface does not account for operators (because it can't) we
+ * don't divide by operator and instead use the concatenation of subtask
+ * index, operator name and metric name as the name.
+ */
+ addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric);
+ addMetric(
+ task.metrics,
+ operatorInfo.subtaskIndex
+ + "."
+ + operatorInfo.operatorName
+ + "."
+ + name,
+ metric);
+ }
break;
default:
LOG.debug("Invalid metric dump category: " + info.getCategory());
@@ -257,6 +336,19 @@ public class MetricStore {
}
}
+ // Returns whether the attempt is the representative one. It's also true if the current
+ // execution attempt number for the subtask is not present in the currentExecutionAttempts,
+ // which means there should be only one execution
+ private boolean isRepresentativeAttempt(
+ String jobID, String vertexID, int subtaskIndex, int attemptNumber) {
+ return Optional.of(currentExecutionAttempts)
+ .map(m -> m.get(jobID))
+ .map(m -> m.get(vertexID))
+ .map(m -> m.get(subtaskIndex))
+ .orElse(attemptNumber)
+ == attemptNumber;
+ }
+
private void addMetric(Map<String, String> target, String name, MetricDump metric) {
switch (metric.getCategory()) {
case METRIC_CATEGORY_COUNTER:
@@ -363,24 +455,24 @@ public class MetricStore {
/** Sub-structure containing metrics of a single Task. */
@ThreadSafe
public static class TaskMetricStore extends ComponentMetricStore {
- private final Map<Integer, ComponentMetricStore> subtasks;
+ private final Map<Integer, SubtaskMetricStore> subtasks;
private TaskMetricStore() {
this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
}
private TaskMetricStore(
- Map<String, String> metrics, Map<Integer, ComponentMetricStore> subtasks) {
+ Map<String, String> metrics, Map<Integer, SubtaskMetricStore> subtasks) {
super(metrics);
this.subtasks = checkNotNull(subtasks);
}
- public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) {
+ public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
return subtasks.get(subtaskIndex);
}
- public Map<Integer, ComponentMetricStore> getAllSubtaskMetricStores() {
- return subtasks;
+ public Map<Integer, SubtaskMetricStore> getAllSubtaskMetricStores() {
+ return unmodifiableMap(subtasks);
}
private static TaskMetricStore unmodifiable(TaskMetricStore source) {
@@ -391,4 +483,36 @@ public class MetricStore {
unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks));
}
}
+
+ /** Sub-structure containing metrics of a single subtask. */
+ @ThreadSafe
+ public static class SubtaskMetricStore extends ComponentMetricStore {
+ private final Map<Integer, ComponentMetricStore> attempts;
+
+ private SubtaskMetricStore() {
+ this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+ }
+
+ private SubtaskMetricStore(
+ Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) {
+ super(metrics);
+ this.attempts = checkNotNull(attempts);
+ }
+
+ public ComponentMetricStore getAttemptsMetricStore(int attemptNumber) {
+ return attempts.get(attemptNumber);
+ }
+
+ public Map<Integer, ComponentMetricStore> getAllAttemptsMetricStores() {
+ return unmodifiableMap(attempts);
+ }
+
+ private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
+ if (source == null) {
+ return null;
+ }
+ return new SubtaskMetricStore(
+ unmodifiableMap(source.metrics), unmodifiableMap(source.attempts));
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 7da9061a090..8ba34a10d85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -99,8 +99,11 @@ public class MutableIOMetrics extends IOMetrics {
fetcher.update();
MetricStore.ComponentMetricStore metrics =
fetcher.getMetricStore()
- .getSubtaskMetricStore(
- jobID, taskID, attempt.getParallelSubtaskIndex());
+ .getSubtaskAttemptMetricStore(
+ jobID,
+ taskID,
+ attempt.getParallelSubtaskIndex(),
+ attempt.getAttemptNumber());
if (metrics != null) {
/**
* We want to keep track of missing metrics to be able to make a difference
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 790ca43ce7d..609ef1f8e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -102,4 +104,33 @@ class JobDetailsTest {
assertThat(unmarshalled).isEqualTo(expected);
}
+
+ @Test
+ void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException {
+ Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
+ currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2);
+ currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4);
+ currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1);
+
+ final JobDetails expected =
+ new JobDetails(
+ new JobID(),
+ "foobar",
+ 1L,
+ 10L,
+ 9L,
+ JobStatus.RUNNING,
+ 8L,
+ new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
+ 42,
+ currentExecutionAttempts);
+
+ final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+ final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+ final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
+
+ assertThat(unmarshalled).isEqualTo(expected);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 52eec21ab5d..2876cf1635a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -159,11 +159,12 @@ class MetricDumpSerializerTest {
gauges.put(
g1,
new Tuple2<QueryScopeInfo, String>(
- new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
+ new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, 0, "D"), "g1"));
histograms.put(
h1,
new Tuple2<QueryScopeInfo, String>(
- new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"),
+ new QueryScopeInfo.OperatorQueryScopeInfo(
+ "jid", "vid", 2, 0, "opname", "E"),
"h1"));
MetricDumpSerialization.MetricSerializationResult serialized =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 0355d01fcf0..7380cd004b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -94,7 +94,7 @@ class QueryScopeInfoTest {
@Test
void testTaskQueryScopeInfo() {
QueryScopeInfo.TaskQueryScopeInfo info =
- new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
+ new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0);
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
assertThat(info.scope).isEmpty();
assertThat(info.jobID).isEqualTo("jobid");
@@ -108,7 +108,7 @@ class QueryScopeInfoTest {
assertThat(info.vertexID).isEqualTo("taskid");
assertThat(info.subtaskIndex).isEqualTo(2);
- info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello");
+ info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0, "hello");
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
assertThat(info.scope).isEqualTo("hello");
assertThat(info.jobID).isEqualTo("jobid");
@@ -126,7 +126,7 @@ class QueryScopeInfoTest {
@Test
void testOperatorQueryScopeInfo() {
QueryScopeInfo.OperatorQueryScopeInfo info =
- new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname");
+ new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, 0, "opname");
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
assertThat(info.scope).isEmpty();
assertThat(info.jobID).isEqualTo("jobid");
@@ -142,7 +142,9 @@ class QueryScopeInfoTest {
assertThat(info.operatorName).isEqualTo("opname");
assertThat(info.subtaskIndex).isEqualTo(2);
- info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello");
+ info =
+ new QueryScopeInfo.OperatorQueryScopeInfo(
+ "jobid", "taskid", 2, 0, "opname", "hello");
assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
assertThat(info.scope).isEqualTo("hello");
assertThat(info.jobID).isEqualTo("jobid");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 77cd5047d3e..85bf3a44cd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -77,6 +77,7 @@ class JobVertexBackPressureHandlerTest {
new TaskQueryScopeInfo(
TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
TEST_JOB_VERTEX_ID.toString(),
+ 0,
0);
dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
@@ -86,7 +87,8 @@ class JobVertexBackPressureHandlerTest {
new TaskQueryScopeInfo(
TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
TEST_JOB_VERTEX_ID.toString(),
- 1);
+ 1,
+ 0);
dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));
@@ -97,7 +99,8 @@ class JobVertexBackPressureHandlerTest {
new TaskQueryScopeInfo(
TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
TEST_JOB_VERTEX_ID.toString(),
- 3);
+ 3,
+ 0);
dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index b04020ab411..ed13c7da040 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -65,19 +65,19 @@ public class AggregatingSubtasksMetricsHandlerTest
Collection<MetricDump> dumps = new ArrayList<>(3);
QueryScopeInfo.TaskQueryScopeInfo task1 =
new QueryScopeInfo.TaskQueryScopeInfo(
- JOB_ID.toString(), TASK_ID.toString(), 1, "abc");
+ JOB_ID.toString(), TASK_ID.toString(), 1, 0, "abc");
MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1);
dumps.add(cd1);
QueryScopeInfo.TaskQueryScopeInfo task2 =
new QueryScopeInfo.TaskQueryScopeInfo(
- JOB_ID.toString(), TASK_ID.toString(), 2, "abc");
+ JOB_ID.toString(), TASK_ID.toString(), 2, 0, "abc");
MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3);
dumps.add(cd2);
QueryScopeInfo.TaskQueryScopeInfo task3 =
new QueryScopeInfo.TaskQueryScopeInfo(
- JOB_ID.toString(), TASK_ID.toString(), 3, "abc");
+ JOB_ID.toString(), TASK_ID.toString(), 3, 0, "abc");
MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5);
dumps.add(cd3);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
index 66ea8c22c00..b48dbbac38f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
@@ -34,6 +34,8 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte
private static final int TEST_SUBTASK_INDEX = 1;
+ private static final int TEST_ATTEMPT_NUMBER = 0;
+
@Override
JobVertexMetricsHandler getMetricsHandler() {
return new JobVertexMetricsHandler(
@@ -43,7 +45,7 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte
@Override
QueryScopeInfo getQueryScopeInfo() {
return new QueryScopeInfo.TaskQueryScopeInfo(
- TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+ TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
index f59569c1d12..30559e56aee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
@@ -37,6 +37,8 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet
private static final int TEST_SUBTASK_INDEX = 0;
+ private static final int TEST_ATTEMPT_NUMBER = 0;
+
@Override
SubtaskMetricsHandler getMetricsHandler() {
return new SubtaskMetricsHandler(leaderRetriever, TIMEOUT, TEST_HEADERS, mockMetricFetcher);
@@ -45,7 +47,7 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet
@Override
QueryScopeInfo getQueryScopeInfo() {
return new QueryScopeInfo.TaskQueryScopeInfo(
- TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+ TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 9dc014d8b50..a7259988672 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -177,12 +177,13 @@ public class MetricFetcherTest extends TestLogger {
c1,
new Tuple2<>(
new QueryScopeInfo.OperatorQueryScopeInfo(
- jobID.toString(), "taskid", 2, "opname", "abc"),
+ jobID.toString(), "taskid", 2, 0, "opname", "abc"),
"oc"));
counters.put(
c2,
new Tuple2<>(
- new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"),
+ new QueryScopeInfo.TaskQueryScopeInfo(
+ jobID.toString(), "taskid", 2, 0, "abc"),
"tc"));
meters.put(
new Meter() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 97c739df224..c54f50291fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -41,7 +44,18 @@ class MetricStoreTest {
assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1")).isEqualTo("3");
assertThat(store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1"))
+ .isEqualTo("14");
+ assertThat(store.getSubtaskMetricStore("jobid", "taskid", 8).getMetric("abc.metric5", "-1"))
+ .isEqualTo("14");
+ assertThat(
+ store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 1)
+ .getMetric("abc.metric5", "-1"))
.isEqualTo("4");
+ assertThat(
+ store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2)
+ .getMetric("abc.metric5", "-1"))
+ .isEqualTo("14");
+
assertThat(
store.getTaskMetricStore("jobid", "taskid")
.getMetric("8.opname.abc.metric6", "-1"))
@@ -50,6 +64,27 @@ class MetricStoreTest {
store.getTaskMetricStore("jobid", "taskid")
.getMetric("8.opname.abc.metric7", "-1"))
.isEqualTo("6");
+ assertThat(
+ store.getTaskMetricStore("jobid", "taskid")
+ .getMetric("1.opname.abc.metric7", "-1"))
+ .isEqualTo("6");
+ assertThat(
+ store.getSubtaskMetricStore("jobid", "taskid", 1)
+ .getMetric("opname.abc.metric7", "-1"))
+ .isEqualTo("6");
+ assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 2)).isNull();
+ assertThat(
+ store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 3)
+ .getMetric("opname.abc.metric7", "-1"))
+ .isEqualTo("6");
+ assertThat(
+ store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2)
+ .getMetric("opname.abc.metric7", "-1"))
+ .isEqualTo("6");
+ assertThat(
+ store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 4)
+ .getMetric("opname.abc.metric7", "-1"))
+ .isEqualTo("16");
}
@Test
@@ -72,6 +107,11 @@ class MetricStoreTest {
}
static MetricStore setupStore(MetricStore store) {
+ Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
+ currentExecutionAttempts.put(8, 2);
+ store.getCurrentExecutionAttempts()
+ .put("jobid", Collections.singletonMap("taskid", currentExecutionAttempts));
+
QueryScopeInfo.JobManagerQueryScopeInfo jm =
new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
@@ -96,19 +136,30 @@ class MetricStoreTest {
MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, "metric4", 3);
QueryScopeInfo.TaskQueryScopeInfo task =
- new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
+ new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1, "abc");
MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, "metric5", 4);
+ QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
+ new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2, "abc");
+ MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14);
+
QueryScopeInfo.OperatorQueryScopeInfo operator =
- new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
+ new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 2, "opname", "abc");
MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, "metric6", 5);
MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, "metric7", 6);
QueryScopeInfo.OperatorQueryScopeInfo operator2 =
- new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, "opname", "abc");
+ new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, 3, "opname", "abc");
MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, "metric6", 5);
MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, "metric7", 6);
+ QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 =
+ new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 4, "opname", "abc");
+ MetricDump.CounterDump cd63 =
+ new MetricDump.CounterDump(speculativeOperator2, "metric6", 15);
+ MetricDump.CounterDump cd73 =
+ new MetricDump.CounterDump(speculativeOperator2, "metric7", 16);
+
store.add(cd1);
store.add(cd2);
store.add(cd2a);
@@ -125,6 +176,10 @@ class MetricStoreTest {
store.add(cd32);
store.add(cd42);
+ store.add(cd52);
+ store.add(cd63);
+ store.add(cd73);
+
return store;
}
}