You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/09 14:04:48 UTC

flink git commit: [FLINK-7575] [webui] Show "Fetching" instead of 0 when IO metrics are not yet retrieved

Repository: flink
Updated Branches:
  refs/heads/master e14caef37 -> c7968e9c2


[FLINK-7575] [webui] Show "Fetching" instead of 0 when IO metrics are not yet retrieved

This closes #4647.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7968e9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7968e9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7968e9c

Branch: refs/heads/master
Commit: c7968e9c2ad6601f9f38b673ead5bc71611a6fbd
Parents: e14caef
Author: James Lafa <ja...@data-artisans.com>
Authored: Tue Sep 5 11:33:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Oct 9 14:07:19 2017 +0200

----------------------------------------------------------------------
 .../jobs/job.plan.node-list.subtasks.jade       | 18 ++++-
 .../partials/jobs/job.plan.node.subtasks.jade   | 17 ++--
 .../jobs/job.plan.node.taskmanagers.jade        | 18 +++--
 .../jobs/job.plan.node-list.subtasks.html       |  8 +-
 .../partials/jobs/job.plan.node.subtasks.html   |  8 +-
 .../jobs/job.plan.node.taskmanagers.html        |  8 +-
 .../handler/legacy/metrics/MetricStore.java     |  4 +
 .../rest/handler/util/MutableIOMetrics.java     | 85 +++++++++++++++++---
 8 files changed, 125 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
index 3bf6f1b..1411f3f 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
@@ -45,10 +45,20 @@ table.table.table-body-hover.table-clickable.table-activable
         span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}
 
       td.td-long {{ v.name | humanizeText }}
-      td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] | humanizeBytes }}
-      td {{ v.metrics['read-records'] | number }}
-      td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes }}
-      td {{ v.metrics['write-records'] | number }}
+
+      td
+        span(ng-if="v.metrics['read-bytes-complete']") {{ v.metrics['read-bytes'] | humanizeBytes }}
+        i(ng-if="!v.metrics['read-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+      td
+        span(ng-if="v.metrics['read-records-complete']") {{ v.metrics['read-records'] | number }}
+        i(ng-if="!v.metrics['read-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+      td
+        span(ng-if="v.metrics['write-bytes-complete']") {{ v.metrics['write-bytes'] | humanizeBytes }}
+        i(ng-if="!v.metrics['write-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+      td
+        span(ng-if="v.metrics['write-records-complete']") {{ v.metrics['write-records'] | number }}
+        i(ng-if="!v.metrics['write-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
+
       td {{ v.parallelism }}
       td
         .label-group

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
index 78b0999..432feb4 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
@@ -32,7 +32,6 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
       th Attempt
       th Host
       th Status
-
   tbody
     tr(ng-repeat="subtask in subtasks | orderBy:'host'")
       td
@@ -41,17 +40,19 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
         span(ng-if="subtask['end-time'] > -1") {{ subtask['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
       td
         span(ng-if="subtask.duration > -1" title="{{subtask.duration | humanizeDuration:false}}") {{subtask.duration | humanizeDuration:true}}
-
+        
       td
-        span(ng-if="subtask.metrics['read-bytes'] > -1" title="{{subtask.metrics['read-bytes']}} bytes")
-          | {{ subtask.metrics['read-bytes'] | humanizeBytes}}
+        span(ng-if="subtask.metrics['read-bytes-complete']") {{ subtask.metrics['read-bytes'] | humanizeBytes }}
+        i(ng-if="!subtask.metrics['read-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
       td
-        span(ng-if="subtask.metrics['read-records'] > -1") {{ subtask.metrics['read-records'] | number }}
+        span(ng-if="subtask.metrics['read-records-complete']") {{ subtask.metrics['read-records'] | number }}
+        i(ng-if="!subtask.metrics['read-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
       td
-        span(ng-if="subtask.metrics['write-bytes'] > -1" title="{{subtask.metrics['write-bytes']}} bytes")
-          | {{ subtask.metrics['write-bytes'] | humanizeBytes}}
+        span(ng-if="subtask.metrics['write-bytes-complete']") {{ subtask.metrics['write-bytes'] | humanizeBytes }}
+        i(ng-if="!subtask.metrics['write-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
       td
-        span(ng-if="subtask.metrics['write-records'] > -1") {{ subtask.metrics['write-records'] | number }}
+        span(ng-if="subtask.metrics['write-records-complete']") {{ subtask.metrics['write-records'] | number }}
+        i(ng-if="!subtask.metrics['write-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
 
       td {{ subtask.attempt + 1 }}
       td {{ subtask.host }}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
index 75a2031..336f48e 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
@@ -36,18 +36,20 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="taskm
       td
         span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
       td
-        span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}
-
+        span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}      
+      
       td
-        span(ng-if="tm.metrics['read-bytes'] > -1" title="{{tm.metrics['read-bytes']}} bytes")
-          | {{ tm.metrics['read-bytes'] | humanizeBytes}}
+        span(ng-if="tm.metrics['read-bytes-complete']") {{ tm.metrics['read-bytes'] | humanizeBytes }}
+        i(ng-if="!tm.metrics['read-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
       td
-        span(ng-if="tm.metrics['read-records'] > -1") {{ tm.metrics['read-records'] | number }}
+        span(ng-if="tm.metrics['read-records-complete']") {{ tm.metrics['read-records'] | number }}
+        i(ng-if="!tm.metrics['read-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
       td
-        span(ng-if="tm.metrics['write-bytes'] > -1" title="{{tm.metrics['write-bytes']}} bytes")
-          | {{ tm.metrics['write-bytes'] | humanizeBytes}}
+        span(ng-if="tm.metrics['write-bytes-complete']") {{ tm.metrics['write-bytes'] | humanizeBytes }}
+        i(ng-if="!tm.metrics['write-bytes-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
       td
-        span(ng-if="tm.metrics['write-records'] > -1") {{ tm.metrics['write-records'] | number }}
+        span(ng-if="tm.metrics['write-records-complete']") {{ tm.metrics['write-records'] | number }}
+        i(ng-if="!tm.metrics['write-records-complete']", class="fa fa-spinner fa-spin fa-fw", aria-hidden="true")
 
       td {{ tm.host }}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
index 4dc8996..f960ebd 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.subtasks.html
@@ -43,10 +43,10 @@ limitations under the License.
       <td><span ng-if="v['end-time'] &gt; -1">{{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
       <td><span ng-if="v.duration &gt; -1" title="{{v.duration | humanizeDuration:false}}">{{v.duration | humanizeDuration:true}}</span></td>
       <td class="td-long">{{ v.name | humanizeText }}</td>
-      <td title="{{v.metrics['read-bytes']}} bytes">{{ v.metrics['read-bytes'] | humanizeBytes }}</td>
-      <td>{{ v.metrics['read-records'] | number }}</td>
-      <td title="{{v.metrics['write-bytes']}} bytes">{{ v.metrics['write-bytes'] | humanizeBytes }}</td>
-      <td>{{ v.metrics['write-records'] | number }}</td>
+      <td><span ng-if="v.metrics['read-bytes-complete']">{{ v.metrics['read-bytes'] | humanizeBytes }}</span><i ng-if="!v.metrics['read-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="v.metrics['read-records-complete']">{{ v.metrics['read-records'] | number }}</span><i ng-if="!v.metrics['read-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="v.metrics['write-bytes-complete']">{{ v.metrics['write-bytes'] | humanizeBytes }}</span><i ng-if="!v.metrics['write-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="v.metrics['write-records-complete']">{{ v.metrics['write-records'] | number }}</span><i ng-if="!v.metrics['write-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
       <td>{{ v.parallelism }}</td>
       <td>
         <div class="label-group">

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
index ae568e8..cb0e921 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
@@ -38,10 +38,10 @@ limitations under the License.
       <td><span ng-if="subtask['start-time'] &gt; -1">{{ subtask['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
       <td><span ng-if="subtask['end-time'] &gt; -1">{{ subtask['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
       <td><span ng-if="subtask.duration &gt; -1" title="{{subtask.duration | humanizeDuration:false}}">{{subtask.duration | humanizeDuration:true}}</span></td>
-      <td><span ng-if="subtask.metrics['read-bytes'] &gt; -1" title="{{subtask.metrics['read-bytes']}} bytes">{{ subtask.metrics['read-bytes'] | humanizeBytes}}</span></td>
-      <td><span ng-if="subtask.metrics['read-records'] &gt; -1">{{ subtask.metrics['read-records'] | number }}</span></td>
-      <td><span ng-if="subtask.metrics['write-bytes'] &gt; -1" title="{{subtask.metrics['write-bytes']}} bytes">{{ subtask.metrics['write-bytes'] | humanizeBytes}}</span></td>
-      <td><span ng-if="subtask.metrics['write-records'] &gt; -1">{{ subtask.metrics['write-records'] | number }}</span></td>
+      <td><span ng-if="subtask.metrics['read-bytes-complete']">{{ subtask.metrics['read-bytes'] | humanizeBytes }}</span><i ng-if="!subtask.metrics['read-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="subtask.metrics['read-records-complete']">{{ subtask.metrics['read-records'] | number }}</span><i ng-if="!subtask.metrics['read-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="subtask.metrics['write-bytes-complete']">{{ subtask.metrics['write-bytes'] | humanizeBytes }}</span><i ng-if="!subtask.metrics['write-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="subtask.metrics['write-records-complete']">{{ subtask.metrics['write-records'] | number }}</span><i ng-if="!subtask.metrics['write-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
       <td>{{ subtask.attempt + 1 }}</td>
       <td>{{ subtask.host }}</td>
       <td> 

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
index 39b30ee..0f19a9e 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.taskmanagers.html
@@ -37,10 +37,10 @@ limitations under the License.
       <td><span ng-if="v['start-time'] &gt; -1">{{ v['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
       <td><span ng-if="v['end-time'] &gt; -1">{{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
       <td><span ng-if="v.duration &gt; -1" title="{{v.duration | humanizeDuration:false}}">{{v.duration | humanizeDuration:true}}</span></td>
-      <td><span ng-if="tm.metrics['read-bytes'] &gt; -1" title="{{tm.metrics['read-bytes']}} bytes">{{ tm.metrics['read-bytes'] | humanizeBytes}}</span></td>
-      <td><span ng-if="tm.metrics['read-records'] &gt; -1">{{ tm.metrics['read-records'] | number }}</span></td>
-      <td><span ng-if="tm.metrics['write-bytes'] &gt; -1" title="{{tm.metrics['write-bytes']}} bytes">{{ tm.metrics['write-bytes'] | humanizeBytes}}</span></td>
-      <td><span ng-if="tm.metrics['write-records'] &gt; -1">{{ tm.metrics['write-records'] | number }}</span></td>
+      <td><span ng-if="tm.metrics['read-bytes-complete']">{{ tm.metrics['read-bytes'] | humanizeBytes }}</span><i ng-if="!tm.metrics['read-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="tm.metrics['read-records-complete']">{{ tm.metrics['read-records'] | number }}</span><i ng-if="!tm.metrics['read-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="tm.metrics['write-bytes-complete']">{{ tm.metrics['write-bytes'] | humanizeBytes }}</span><i ng-if="!tm.metrics['write-bytes-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
+      <td><span ng-if="tm.metrics['write-records-complete']">{{ tm.metrics['write-records'] | number }}</span><i ng-if="!tm.metrics['write-records-complete']" aria-hidden="true" class="fa fa-spinner fa-spin fa-fw"></i></td>
       <td>{{ tm.host }}</td>
       <td>
         <div class="label-group">

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 6d3fc99..9c13ab8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -250,6 +250,10 @@ public class MetricStore {
 	private abstract static class ComponentMetricStore {
 		public final Map<String, String> metrics = new HashMap<>();
 
+		public String getMetric(String name) {
+			return this.metrics.get(name);
+		}
+
 		public String getMetric(String name, String defaultValue) {
 			String value = this.metrics.get(name);
 			return value != null

http://git-wip-us.apache.org/repos/asf/flink/blob/c7968e9c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index e2aaaf7..2f5a7c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -44,6 +44,11 @@ import java.io.IOException;
 public class MutableIOMetrics extends IOMetrics {
 
 	private static final long serialVersionUID = -5460777634971381737L;
+	private boolean numBytesInLocalComplete = true;
+	private boolean numBytesInRemoteComplete = true;
+	private boolean numBytesOutComplete = true;
+	private boolean numRecordsInComplete = true;
+	private boolean numRecordsOutComplete = true;
 
 	public MutableIOMetrics() {
 		super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
@@ -72,13 +77,58 @@ public class MutableIOMetrics extends IOMetrics {
 		} else { // execAttempt is still running, use MetricQueryService instead
 			if (fetcher != null) {
 				fetcher.update();
-				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
-				if (metrics != null) {
-					this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
-					this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-					this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-					this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-					this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+				MetricStore metricStore = fetcher.getMetricStore();
+				synchronized (metricStore) {
+					MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+					if (metrics != null) {
+						/**
+						 * We want to keep track of missing metrics to be able to make a difference between 0 as a value
+						 * and a missing value.
+						 * In case a metric is missing for a parallel instance of a task, we set the complete flag as
+						 * false.
+						 */
+						if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){
+							this.numBytesInLocalComplete = false;
+						}
+						else {
+							this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL));
+						}
+
+						if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){
+							this.numBytesInRemoteComplete = false;
+						}
+						else {
+							this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE));
+						}
+
+						if (metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){
+							this.numBytesOutComplete = false;
+						}
+						else {
+							this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT));
+						}
+
+						if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){
+							this.numRecordsInComplete = false;
+						}
+						else {
+							this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN));
+						}
+
+						if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){
+							this.numRecordsOutComplete = false;
+						}
+						else {
+							this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
+						}
+					}
+					else {
+						this.numBytesInLocalComplete = false;
+						this.numBytesInRemoteComplete = false;
+						this.numBytesOutComplete = false;
+						this.numRecordsInComplete = false;
+						this.numRecordsOutComplete = false;
+					}
 				}
 			}
 		}
@@ -90,20 +140,37 @@ public class MutableIOMetrics extends IOMetrics {
 	 * <p>The JSON structure written is as follows:
 	 * "metrics": {
 	 *     "read-bytes": 1,
+	 *     "read-bytes-complete": true,
 	 *     "write-bytes": 2,
+	 *     "write-bytes-complete": true,
 	 *     "read-records": 3,
-	 *     "write-records": 4
+	 *     "read-records-complete": true,
+	 *     "write-records": 4,
+	 *     "write-records-complete": true
 	 * }
 	 *
 	 * @param gen JsonGenerator to which the metrics should be written
 	 * @throws IOException
 	 */
 	public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+		/**
+		 * As described in {@link addIOMetrics}, we want to distinguish incomplete values from 0.
+		 * However, for API backward compatibility, incomplete metrics will still be represented by the 0 value and
+		 * a boolean will indicate the completeness.
+		 */
+
 		gen.writeObjectFieldStart("metrics");
-		gen.writeNumberField("read-bytes", this.numBytesInLocal + this.numBytesInRemote);
+
+		Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote;
+		gen.writeNumberField("read-bytes", numBytesIn);
+		gen.writeBooleanField("read-bytes-complete", (this.numBytesInLocalComplete && this.numBytesInRemoteComplete));
 		gen.writeNumberField("write-bytes", this.numBytesOut);
+		gen.writeBooleanField("write-bytes-complete", this.numBytesOutComplete);
 		gen.writeNumberField("read-records", this.numRecordsIn);
+		gen.writeBooleanField("read-records-complete", this.numRecordsInComplete);
 		gen.writeNumberField("write-records", this.numRecordsOut);
+		gen.writeBooleanField("write-records-complete", this.numRecordsOutComplete);
+
 		gen.writeEndObject();
 	}
 }