You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/09 14:04:48 UTC
flink git commit: [FLINK-7575] [webui] Show "Fetching" instead of 0
when IO metrics are not yet retrieved
Repository: flink
Updated Branches:
refs/heads/master e14caef37 -> c7968e9c2
[FLINK-7575] [webui] Show "Fetching" instead of 0 when IO metrics are not yet retrieved
This closes #4647.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7968e9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7968e9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7968e9c
Branch: refs/heads/master
Commit: c7968e9c2ad6601f9f38b673ead5bc71611a6fbd
Parents: e14caef
Author: James Lafa <ja...@data-artisans.com>
Authored: Tue Sep 5 11:33:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Oct 9 14:07:19 2017 +0200
----------------------------------------------------------------------
.../jobs/job.plan.node-list.subtasks.jade | 18 ++++-
.../partials/jobs/job.plan.node.subtasks.jade | 17 ++--
.../jobs/job.plan.node.taskmanagers.jade | 18 +++--
.../jobs/job.plan.node-list.subtasks.html | 8 +-
.../partials/jobs/job.plan.node.subtasks.html | 8 +-
.../jobs/job.plan.node.taskmanagers.html | 8 +-
.../handler/legacy/metrics/MetricStore.java | 4 +
.../rest/handler/util/MutableIOMetrics.java | 85 +++++++++++++++++---
8 files changed, 125 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
index 3bf6f1b..1411f3f 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
@@ -45,10 +45,20 @@ table.table.table-body-hover.table-clickable.table-activable
span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}
td.td-long {{ v.name | humanizeText }}
- td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] | humanizeBytes }}
- td {{ v.metrics['read-records'] | number }}
- td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes }}
- td {{ v.metrics['write-records'] | number }}
+
+ td
+ span(ng-if="v.metrics['read-bytes-complete']") {{ v.metrics['read-bytes'] | humanizeBytes }}
+ i(ng-if="!v.metrics['read-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+ td
+ span(ng-if="v.metrics['read-records-complete']") {{ v.metrics['read-records'] | number }}
+ i(ng-if="!v.metrics['read-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+ td
+ span(ng-if="v.metrics['write-bytes-complete']") {{ v.metrics['write-bytes'] | humanizeBytes }}
+ i(ng-if="!v.metrics['write-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+ td
+ span(ng-if="v.metrics['write-records-complete']") {{ v.metrics['write-records'] | number }}
+ i(ng-if="!v.metrics['write-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+
td {{ v.parallelism }}
td
.label-group
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
index 78b0999..432feb4 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
@@ -32,7 +32,6 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
th Attempt
th Host
th Status
-
tbody
tr(ng-repeat="subtask in subtasks | orderBy:'host'")
td
@@ -41,17 +40,19 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
span(ng-if="subtask['end-time'] > -1") {{ subtask['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
td
span(ng-if="subtask.duration > -1" title="{{subtask.duration | humanizeDuration:false}}") {{subtask.duration | humanizeDuration:true}}
-
+
td
- span(ng-if="subtask.metrics['read-bytes'] > -1" title="{{subtask.metrics['read-bytes']}} bytes")
- | {{ subtask.metrics['read-bytes'] | humanizeBytes}}
+ span(ng-if="subtask.metrics['read-bytes-complete']") {{ subtask.metrics['read-bytes'] | humanizeBytes }}
+ i(ng-if="!subtask.metrics['read-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td
- span(ng-if="subtask.metrics['read-records'] > -1") {{ subtask.metrics['read-records'] | number }}
+ span(ng-if="subtask.metrics['read-records-complete']") {{ subtask.metrics['read-records'] | number }}
+ i(ng-if="!subtask.metrics['read-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td
- span(ng-if="subtask.metrics['write-bytes'] > -1" title="{{subtask.metrics['write-bytes']}} bytes")
- | {{ subtask.metrics['write-bytes'] | humanizeBytes}}
+ span(ng-if="subtask.metrics['write-bytes-complete']") {{ subtask.metrics['write-bytes'] | humanizeBytes }}
+ i(ng-if="!subtask.metrics['write-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td
- span(ng-if="subtask.metrics['write-records'] > -1") {{ subtask.metrics['write-records'] | number }}
+ span(ng-if="subtask.metrics['write-records-complete']") {{ subtask.metrics['write-records'] | number }}
+ i(ng-if="!subtask.metrics['write-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td {{ subtask.attempt + 1 }}
td {{ subtask.host }}
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
index 75a2031..336f48e 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
@@ -36,18 +36,20 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="taskm
td
span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
td
- span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}
-
+ span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}
+
td
- span(ng-if="tm.metrics['read-bytes'] > -1" title="{{tm.metrics['read-bytes']}} bytes")
- | {{ tm.metrics['read-bytes'] | humanizeBytes}}
+ span(ng-if="tm.metrics['read-bytes-complete']") {{ tm.metrics['read-bytes'] | humanizeBytes }}
+ i(ng-if="!tm.metrics['read-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td
- span(ng-if="tm.metrics['read-records'] > -1") {{ tm.metrics['read-records'] | number }}
+ span(ng-if="tm.metrics['read-records-complete']") {{ tm.metrics['read-records'] | number }}
+ i(ng-if="!tm.metrics['read-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td
- span(ng-if="tm.metrics['write-bytes'] > -1" title="{{tm.metrics['write-bytes']}} bytes")
- | {{ tm.metrics['write-bytes'] | humanizeBytes}}
+ span(ng-if="tm.metrics['write-bytes-complete']") {{ tm.metrics['write-bytes'] | humanizeBytes }}
+ i(ng-if="!tm.metrics['write-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td
- span(ng-if="tm.metrics['write-records'] > -1") {{ tm.metrics['write-records'] | number }}
+ span(ng-if="tm.metrics['write-records-complete']") {{ tm.metrics['write-records'] | number }}
+ i(ng-if="!tm.metrics['write-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
td {{ tm.host }}
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
index 4dc8996..f960ebd 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
@@ -43,10 +43,10 @@ limitations under the License.
<td><span ng-if="v['end-time'] > -1">{{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
<td><span ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}">{{v.duration | humanizeDuration:true}}</span></td>
<td class="td-long">{{ v.name | humanizeText }}</td>
- <td title="{{v.metrics['read-bytes']}} bytes">{{ v.metrics['read-bytes'] | humanizeBytes }}</td>
- <td>{{ v.metrics['read-records'] | number }}</td>
- <td title="{{v.metrics['write-bytes']}} bytes">{{ v.metrics['write-bytes'] | humanizeBytes }}</td>
- <td>{{ v.metrics['write-records'] | number }}</td>
+ <td><span ng-if="v.metrics['read-bytes-complete']">{{ v.metrics['read-bytes'] | humanizeBytes }}</span><i ng-if="!v.metrics['read-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="v.metrics['read-records-complete']">{{ v.metrics['read-records'] | number }}</span><i ng-if="!v.metrics['read-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="v.metrics['write-bytes-complete']">{{ v.metrics['write-bytes'] | humanizeBytes }}</span><i ng-if="!v.metrics['write-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="v.metrics['write-records-complete']">{{ v.metrics['write-records'] | number }}</span><i ng-if="!v.metrics['write-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
<td>{{ v.parallelism }}</td>
<td>
<div class="label-group">
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
index ae568e8..cb0e921 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
@@ -38,10 +38,10 @@ limitations under the License.
<td><span ng-if="subtask['start-time'] > -1">{{ subtask['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
<td><span ng-if="subtask['end-time'] > -1">{{ subtask['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
<td><span ng-if="subtask.duration > -1" title="{{subtask.duration | humanizeDuration:false}}">{{subtask.duration | humanizeDuration:true}}</span></td>
- <td><span ng-if="subtask.metrics['read-bytes'] > -1" title="{{subtask.metrics['read-bytes']}} bytes">{{ subtask.metrics['read-bytes'] | humanizeBytes}}</span></td>
- <td><span ng-if="subtask.metrics['read-records'] > -1">{{ subtask.metrics['read-records'] | number }}</span></td>
- <td><span ng-if="subtask.metrics['write-bytes'] > -1" title="{{subtask.metrics['write-bytes']}} bytes">{{ subtask.metrics['write-bytes'] | humanizeBytes}}</span></td>
- <td><span ng-if="subtask.metrics['write-records'] > -1">{{ subtask.metrics['write-records'] | number }}</span></td>
+ <td><span ng-if="subtask.metrics['read-bytes-complete']">{{ subtask.metrics['read-bytes'] | humanizeBytes }}</span><i ng-if="!subtask.metrics['read-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="subtask.metrics['read-records-complete']">{{ subtask.metrics['read-records'] | number }}</span><i ng-if="!subtask.metrics['read-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="subtask.metrics['write-bytes-complete']">{{ subtask.metrics['write-bytes'] | humanizeBytes }}</span><i ng-if="!subtask.metrics['write-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="subtask.metrics['write-records-complete']">{{ subtask.metrics['write-records'] | number }}</span><i ng-if="!subtask.metrics['write-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
<td>{{ subtask.attempt + 1 }}</td>
<td>{{ subtask.host }}</td>
<td>
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
index 39b30ee..0f19a9e 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
@@ -37,10 +37,10 @@ limitations under the License.
<td><span ng-if="v['start-time'] > -1">{{ v['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
<td><span ng-if="v['end-time'] > -1">{{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
<td><span ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}">{{v.duration | humanizeDuration:true}}</span></td>
- <td><span ng-if="tm.metrics['read-bytes'] > -1" title="{{tm.metrics['read-bytes']}} bytes">{{ tm.metrics['read-bytes'] | humanizeBytes}}</span></td>
- <td><span ng-if="tm.metrics['read-records'] > -1">{{ tm.metrics['read-records'] | number }}</span></td>
- <td><span ng-if="tm.metrics['write-bytes'] > -1" title="{{tm.metrics['write-bytes']}} bytes">{{ tm.metrics['write-bytes'] | humanizeBytes}}</span></td>
- <td><span ng-if="tm.metrics['write-records'] > -1">{{ tm.metrics['write-records'] | number }}</span></td>
+ <td><span ng-if="tm.metrics['read-bytes-complete']">{{ tm.metrics['read-bytes'] | humanizeBytes }}</span><i ng-if="!tm.metrics['read-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="tm.metrics['read-records-complete']">{{ tm.metrics['read-records'] | number }}</span><i ng-if="!tm.metrics['read-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="tm.metrics['write-bytes-complete']">{{ tm.metrics['write-bytes'] | humanizeBytes }}</span><i ng-if="!tm.metrics['write-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+ <td><span ng-if="tm.metrics['write-records-complete']">{{ tm.metrics['write-records'] | number }}</span><i ng-if="!tm.metrics['write-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
<td>{{ tm.host }}</td>
<td>
<div class="label-group">
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 6d3fc99..9c13ab8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -250,6 +250,10 @@ public class MetricStore {
private abstract static class ComponentMetricStore {
public final Map<String, String> metrics = new HashMap<>();
+ public String getMetric(String name) {
+ return this.metrics.get(name);
+ }
+
public String getMetric(String name, String defaultValue) {
String value = this.metrics.get(name);
return value != null
http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index e2aaaf7..2f5a7c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -44,6 +44,11 @@ import java.io.IOException;
public class MutableIOMetrics extends IOMetrics {
private static final long serialVersionUID = -5460777634971381737L;
+ private boolean numBytesInLocalComplete = true;
+ private boolean numBytesInRemoteComplete = true;
+ private boolean numBytesOutComplete = true;
+ private boolean numRecordsInComplete = true;
+ private boolean numRecordsOutComplete = true;
public MutableIOMetrics() {
super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
@@ -72,13 +77,58 @@ public class MutableIOMetrics extends IOMetrics {
} else { // execAttempt is still running, use MetricQueryService instead
if (fetcher != null) {
fetcher.update();
- MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
- if (metrics != null) {
- this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
- this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
- this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
- this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
- this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+ MetricStore metricStore = fetcher.getMetricStore();
+ synchronized (metricStore) {
+ MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+ if (metrics != null) {
+ /**
+ * We want to keep track of missing metrics to be able to make a difference between 0 as a value
+ * and a missing value.
+ * In case a metric is missing for a parallel instance of a task, we set the complete flag as
+ * false.
+ */
+ if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){
+ this.numBytesInLocalComplete = false;
+ }
+ else {
+ this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL));
+ }
+
+ if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){
+ this.numBytesInRemoteComplete = false;
+ }
+ else {
+ this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE));
+ }
+
+ if (metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){
+ this.numBytesOutComplete = false;
+ }
+ else {
+ this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT));
+ }
+
+ if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){
+ this.numRecordsInComplete = false;
+ }
+ else {
+ this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN));
+ }
+
+ if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){
+ this.numRecordsOutComplete = false;
+ }
+ else {
+ this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
+ }
+ }
+ else {
+ this.numBytesInLocalComplete = false;
+ this.numBytesInRemoteComplete = false;
+ this.numBytesOutComplete = false;
+ this.numRecordsInComplete = false;
+ this.numRecordsOutComplete = false;
+ }
}
}
}
@@ -90,20 +140,37 @@ public class MutableIOMetrics extends IOMetrics {
* <p>The JSON structure written is as follows:
* "metrics": {
* "read-bytes": 1,
+ * "read-bytes-complete": true,
* "write-bytes": 2,
+ * "write-bytes-complete": true,
* "read-records": 3,
- * "write-records": 4
+ * "read-records-complete": true,
+ * "write-records": 4,
+ * "write-records-complete": true
* }
*
* @param gen JsonGenerator to which the metrics should be written
* @throws IOException
*/
public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+ /**
+ * As described in {@link addIOMetrics}, we want to distinguish incomplete values from 0.
+ * However, for API backward compatibility, incomplete metrics will still be represented by the 0 value and
+ * a boolean will indicate the completeness.
+ */
+
gen.writeObjectFieldStart("metrics");
- gen.writeNumberField("read-bytes", this.numBytesInLocal + this.numBytesInRemote);
+
+ Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote;
+ gen.writeNumberField("read-bytes", numBytesIn);
+ gen.writeBooleanField("read-bytes-complete", (this.numBytesInLocalComplete && this.numBytesInRemoteComplete));
gen.writeNumberField("write-bytes", this.numBytesOut);
+ gen.writeBooleanField("write-bytes-complete", this.numBytesOutComplete);
gen.writeNumberField("read-records", this.numRecordsIn);
+ gen.writeBooleanField("read-records-complete", this.numRecordsInComplete);
gen.writeNumberField("write-records", this.numRecordsOut);
+ gen.writeBooleanField("write-records-complete", this.numRecordsOutComplete);
+
gen.writeEndObject();
}
}