You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/31 15:08:33 UTC

[5/5] flink git commit: [FLINK-4733] Port Task IO metrics

[FLINK-4733] Port Task IO metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba2d007e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba2d007e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba2d007e

Branch: refs/heads/master
Commit: ba2d007e5ad270b9a403d037d186de61cdaac742
Parents: cf4f364
Author: zentol <ch...@apache.org>
Authored: Mon Oct 31 14:17:05 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Oct 31 15:12:04 2016 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 12 +--
 .../webmonitor/handlers/JobDetailsHandler.java  | 53 +++++++++----
 .../handlers/JobVertexDetailsHandler.java       | 63 ++++++++-------
 .../handlers/JobVertexTaskManagersHandler.java  | 57 +++++++-------
 .../SubtaskCurrentAttemptDetailsHandler.java    |  5 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  | 63 ++++++++-------
 .../accumulators/AccumulatorRegistry.java       | 80 +------------------
 .../accumulators/AccumulatorSnapshot.java       | 15 ----
 .../runtime/executiongraph/AccessExecution.java | 15 +---
 .../executiongraph/AccessExecutionGraph.java    | 11 ---
 .../AccessExecutionJobVertex.java               | 13 ----
 .../executiongraph/ArchivedExecution.java       | 18 ++---
 .../executiongraph/ArchivedExecutionGraph.java  | 21 -----
 .../ArchivedExecutionJobVertex.java             |  8 --
 .../flink/runtime/executiongraph/Execution.java | 24 +++---
 .../runtime/executiongraph/ExecutionGraph.java  | 28 +------
 .../executiongraph/ExecutionJobVertex.java      | 33 --------
 .../api/reader/AbstractRecordReader.java        |  8 --
 .../io/network/api/reader/BufferReader.java     |  6 --
 .../io/network/api/reader/ReaderBase.java       |  6 --
 .../AdaptiveSpanningRecordDeserializer.java     | 31 --------
 .../api/serialization/RecordDeserializer.java   |  6 --
 .../api/serialization/RecordSerializer.java     |  6 --
 .../serialization/SpanningRecordSerializer.java | 13 ----
 ...llingAdaptiveSpanningRecordDeserializer.java | 31 --------
 .../io/network/api/writer/RecordWriter.java     | 10 ---
 .../iterative/task/IterationHeadTask.java       |  4 +-
 .../flink/runtime/operators/BatchTask.java      | 22 ++----
 .../flink/runtime/operators/DataSinkTask.java   |  6 --
 .../flink/runtime/operators/DataSourceTask.java |  6 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  4 +
 .../runtime/taskmanager/TaskExecutionState.java | 13 +++-
 .../flink/runtime/taskmanager/TaskManager.scala |  4 +-
 .../ArchivedExecutionGraphTest.java             | 23 ------
 .../ExecutionGraphDeploymentTest.java           |  7 +-
 .../network/api/reader/AbstractReaderTest.java  |  6 --
 .../testingUtils/TestingJobManagerLike.scala    |  3 +-
 .../TestingJobManagerMessages.scala             |  2 -
 .../runtime/io/StreamInputProcessor.java        |  7 --
 .../runtime/io/StreamTwoInputProcessor.java     |  7 --
 .../runtime/tasks/OneInputStreamTask.java       |  4 -
 .../streaming/runtime/tasks/OperatorChain.java  |  8 +-
 .../streaming/runtime/tasks/StreamTask.java     |  2 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  4 -
 .../operators/StreamOperatorChainingTest.java   |  9 +--
 .../accumulators/AccumulatorLiveITCase.java     | 82 ++------------------
 46 files changed, 207 insertions(+), 652 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a0afba2..7d2b5b6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -262,12 +262,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 			.GET("/jobs", handler(new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)))
 
-			.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs)))
+			.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs, metricFetcher)))
+			.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs, metricFetcher)))
 
-			.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs, metricFetcher)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
 			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
@@ -276,8 +276,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 							refreshInterval)))
 			.GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
-			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)))
+			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
 
 			.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index e7a2a8c..6de6dc5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -20,16 +20,16 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -45,9 +45,12 @@ import java.util.Map;
  * </ul>
  */
 public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
-	
-	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+
+	private final MetricFetcher fetcher;
+
+	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
 		super(executionGraphHolder);
+		this.fetcher = fetcher;
 	}
 
 	@Override
@@ -124,13 +127,6 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 			ExecutionState jobVertexState = 
 					ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
 			jobVerticesPerState[jobVertexState.ordinal()]++;
-			
-			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = ejv.getAggregatedMetricAccumulators();
-
-			LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-			LongCounter writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-			LongCounter readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-			LongCounter writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
 
 			gen.writeStartObject();
 			gen.writeStringField("id", ejv.getJobVertexId().toString());
@@ -148,11 +144,36 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 			}
 			gen.writeEndObject();
 			
+			long numBytesIn = 0;
+			long numBytesOut = 0;
+			long numRecordsIn = 0;
+			long numRecordsOut = 0;
+
+			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+				if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+					numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+					numBytesOut += ioMetrics.getNumBytesOut();
+					numRecordsIn += ioMetrics.getNumRecordsIn();
+					numRecordsOut += ioMetrics.getNumRecordsOut();
+				} else { // execAttempt is still running, use MetricQueryService instead
+					fetcher.update();
+					MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
+					if (metrics != null) {
+						numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+						numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+						numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+						numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+					}
+				}
+			}
+
 			gen.writeObjectFieldStart("metrics");
-			gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
-			gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
-			gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
-			gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+			gen.writeNumberField("read-bytes", numBytesIn);
+			gen.writeNumberField("write-bytes", numBytesOut);
+			gen.writeNumberField("read-records", numRecordsIn);
+			gen.writeNumberField("write-records", numRecordsOut);
 			gen.writeEndObject();
 			
 			gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index fbdd86b..14dcd39 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -37,9 +37,12 @@ import java.util.Map;
  * and the runtime and metrics of all its subtasks.
  */
 public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
-	
-	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+
+	private final MetricFetcher fetcher;
+
+	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
 		super(executionGraphHolder);
+		this.fetcher = fetcher;
 	}
 
 	@Override
@@ -71,25 +74,6 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 			long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1;
 			long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
 			
-			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-			LongCounter readBytes;
-			LongCounter writeBytes;
-			LongCounter readRecords;
-			LongCounter writeRecords;
-			
-			if (metrics != null) {
-				readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-				writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-				readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-				writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-			}
-			else {
-				readBytes = null;
-				writeBytes = null;
-				readRecords = null;
-				writeRecords = null;
-			}
-			
 			gen.writeStartObject();
 			gen.writeNumberField("subtask", num);
 			gen.writeStringField("status", status.name());
@@ -99,11 +83,34 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 			gen.writeNumberField("end-time", endTime);
 			gen.writeNumberField("duration", duration);
 
+			IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+			long numBytesIn = 0;
+			long numBytesOut = 0;
+			long numRecordsIn = 0;
+			long numRecordsOut = 0;
+
+			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+				numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+				numBytesOut = ioMetrics.getNumBytesOut();
+				numRecordsIn = ioMetrics.getNumRecordsIn();
+				numRecordsOut = ioMetrics.getNumRecordsOut();
+			} else { // execAttempt is still running, use MetricQueryService instead
+				fetcher.update();
+				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
+				if (metrics != null) {
+					numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+					numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+					numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+					numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+				}
+			}
+
 			gen.writeObjectFieldStart("metrics");
-			gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
-			gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
-			gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
-			gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+			gen.writeNumberField("read-bytes", numBytesIn);
+			gen.writeNumberField("write-bytes", numBytesOut);
+			gen.writeNumberField("read-records", numRecordsIn);
+			gen.writeNumberField("write-records", numRecordsOut);
 			gen.writeEndObject();
 			
 			gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 0e94334..c1fabf8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -19,15 +19,15 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -42,8 +42,11 @@ import java.util.Map.Entry;
  */
 public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler {
 
-	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder) {
+	private final MetricFetcher fetcher;
+
+	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
 		super(executionGraphHolder);
+		this.fetcher = fetcher;
 	}
 
 	@Override
@@ -88,10 +91,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 			long endTime = 0;
 			boolean allFinished = true;
 
-			LongCounter tmReadBytes = new LongCounter();
-			LongCounter tmWriteBytes = new LongCounter();
-			LongCounter tmReadRecords = new LongCounter();
-			LongCounter tmWriteRecords = new LongCounter();
+			long numBytesIn = 0;
+			long numBytesOut = 0;
+			long numRecordsIn = 0;
+			long numRecordsOut = 0;
 
 			for (AccessExecutionVertex vertex : taskVertices) {
 				final ExecutionState state = vertex.getExecutionState();
@@ -106,20 +109,22 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 				allFinished &= state.isTerminal();
 				endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-				Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-
-				if (metrics != null) {
-					LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-					tmReadBytes.merge(readBytes);
-
-					LongCounter writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-					tmWriteBytes.merge(writeBytes);
-
-					LongCounter readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-					tmReadRecords.merge(readRecords);
-
-					LongCounter writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-					tmWriteRecords.merge(writeRecords);
+				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+				if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+					numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+					numBytesOut += ioMetrics.getNumBytesOut();
+					numRecordsIn += ioMetrics.getNumRecordsIn();
+					numRecordsOut += ioMetrics.getNumRecordsOut();
+				} else { // execAttempt is still running, use MetricQueryService instead
+					fetcher.update();
+					MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex());
+					if (metrics != null) {
+						numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+						numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+						numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+						numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+					}
 				}
 			}
 
@@ -152,10 +157,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 			gen.writeNumberField("duration", duration);
 
 			gen.writeObjectFieldStart("metrics");
-			gen.writeNumberField("read-bytes", tmReadBytes.getLocalValuePrimitive());
-			gen.writeNumberField("write-bytes", tmWriteBytes.getLocalValuePrimitive());
-			gen.writeNumberField("read-records", tmReadRecords.getLocalValuePrimitive());
-			gen.writeNumberField("write-records", tmWriteRecords.getLocalValuePrimitive());
+			gen.writeNumberField("read-bytes", numBytesIn);
+			gen.writeNumberField("write-bytes", numBytesOut);
+			gen.writeNumberField("read-records", numRecordsIn);
+			gen.writeNumberField("write-records", numRecordsOut);
 			gen.writeEndObject();
 
 			gen.writeObjectFieldStart("status-counts");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 811bea6..6d09513 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 
 import java.util.Map;
 
@@ -28,8 +29,8 @@ import java.util.Map;
  */
 public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler {
 	
-	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
+		super(executionGraphHolder, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 3cc7376..ca9c7ad 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -35,9 +35,12 @@ import java.util.Map;
  * Request handler providing details about a single task execution attempt.
  */
 public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler {
-	
-	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+
+	private final MetricFetcher fetcher;
+
+	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
 		super(executionGraphHolder);
+		this.fetcher = fetcher;
 	}
 
 	@Override
@@ -55,25 +58,6 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1;
 		long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
 
-		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = execAttempt.getFlinkAccumulators();
-		LongCounter readBytes;
-		LongCounter writeBytes;
-		LongCounter readRecords;
-		LongCounter writeRecords;
-
-		if (metrics != null) {
-			readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-			writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-			readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-			writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-		}
-		else {
-			readBytes = null;
-			writeBytes = null;
-			readRecords = null;
-			writeRecords = null;
-		}
-
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -86,11 +70,34 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		gen.writeNumberField("end-time", endTime);
 		gen.writeNumberField("duration", duration);
 
+		IOMetrics ioMetrics = execAttempt.getIOMetrics();
+
+		long numBytesIn = 0;
+		long numBytesOut = 0;
+		long numRecordsIn = 0;
+		long numRecordsOut = 0;
+		
+		if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+			numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+			numBytesOut = ioMetrics.getNumBytesOut();
+			numRecordsIn = ioMetrics.getNumRecordsIn();
+			numRecordsOut = ioMetrics.getNumRecordsOut();
+		} else { // execAttempt is still running, use MetricQueryService instead
+			fetcher.update();
+			MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
+			if (metrics != null) {
+				numBytesIn = Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+				numBytesOut = Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+				numRecordsIn = Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+				numRecordsOut = Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+			}
+		}
+		
 		gen.writeObjectFieldStart("metrics");
-		gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L);
-		gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L);
-		gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L);
-		gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L);
+		gen.writeNumberField("read-bytes", numBytesIn);
+		gen.writeNumberField("write-bytes", numBytesOut);
+		gen.writeNumberField("read-records", numRecordsIn);
+		gen.writeNumberField("write-records", numRecordsOut);
 		gen.writeEndObject();
 
 		gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
index 44714e7..ce6cb1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
@@ -20,18 +20,16 @@ package org.apache.flink.runtime.accumulators;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
- * Main accumulator registry which encapsulates internal and user-defined accumulators.
+ * Main accumulator registry which encapsulates user-defined accumulators.
  */
 public class AccumulatorRegistry {
 
@@ -40,32 +38,13 @@ public class AccumulatorRegistry {
 	protected final JobID jobID;
 	protected final ExecutionAttemptID taskID;
 
-	/* Flink's internal Accumulator values stored for the executing task. */
-	private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =
-			new HashMap<Metric, Accumulator<?, ?>>();
-
 	/* User-defined Accumulator values stored for the executing task. */
 	private final Map<String, Accumulator<?, ?>> userAccumulators =
 			new ConcurrentHashMap<>(4);
 
-	/* The reporter reference that is handed to the reporting tasks. */
-	private final ReadWriteReporter reporter;
-
-	/**
-	 * Flink metrics supported
-	 */
-	public enum Metric {
-		NUM_RECORDS_IN,
-		NUM_RECORDS_OUT,
-		NUM_BYTES_IN,
-		NUM_BYTES_OUT
-	}
-
-
 	public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
 		this.jobID = jobID;
 		this.taskID = taskID;
-		this.reporter = new ReadWriteReporter(flinkAccumulators);
 	}
 
 	/**
@@ -74,7 +53,7 @@ public class AccumulatorRegistry {
 	 */
 	public AccumulatorSnapshot getSnapshot() {
 		try {
-			return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators);
+			return new AccumulatorSnapshot(jobID, taskID, userAccumulators);
 		} catch (Throwable e) {
 			LOG.warn("Failed to serialize accumulators for task.", e);
 			return null;
@@ -88,59 +67,4 @@ public class AccumulatorRegistry {
 		return userAccumulators;
 	}
 
-	/**
-	 * Gets the reporter for flink internal metrics.
-	 */
-	public Reporter getReadWriteReporter() {
-		return reporter;
-	}
-
-	/**
-	 * Interface for Flink's internal accumulators.
-	 */
-	public interface Reporter {
-		void reportNumRecordsIn(long value);
-		void reportNumRecordsOut(long value);
-		void reportNumBytesIn(long value);
-		void reportNumBytesOut(long value);
-	}
-
-	/**
-	 * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out)
-	 */
-	private static class ReadWriteReporter implements Reporter {
-
-		private LongCounter numRecordsIn = new LongCounter();
-		private LongCounter numRecordsOut = new LongCounter();
-		private LongCounter numBytesIn = new LongCounter();
-		private LongCounter numBytesOut = new LongCounter();
-
-		private ReadWriteReporter(Map<Metric, Accumulator<?,?>> accumulatorMap) {
-			accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
-			accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut);
-			accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
-			accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
-		}
-
-		@Override
-		public void reportNumRecordsIn(long value) {
-			numRecordsIn.add(value);
-		}
-
-		@Override
-		public void reportNumRecordsOut(long value) {
-			numRecordsOut.add(value);
-		}
-
-		@Override
-		public void reportNumBytesIn(long value) {
-			numBytesIn.add(value);
-		}
-
-		@Override
-		public void reportNumBytesOut(long value) {
-			numBytesOut.add(value);
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
index d0f4bad..0bfb1ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
@@ -40,21 +40,14 @@ public class AccumulatorSnapshot implements Serializable {
 	private final ExecutionAttemptID executionAttemptID;
 
 	/**
-	 * Flink internal accumulators which can be deserialized using the system class loader.
-	 */
-	private final SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators;
-
-	/**
 	 * Serialized user accumulators which may require the custom user class loader.
 	 */
 	private final SerializedValue<Map<String, Accumulator<?, ?>>> userAccumulators;
 
 	public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID,
-							Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
 							Map<String, Accumulator<?, ?>> userAccumulators) throws IOException {
 		this.jobID = jobID;
 		this.executionAttemptID = executionAttemptID;
-		this.flinkAccumulators = new SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>(flinkAccumulators);
 		this.userAccumulators = new SerializedValue<Map<String, Accumulator<?, ?>>>(userAccumulators);
 	}
 
@@ -67,14 +60,6 @@ public class AccumulatorSnapshot implements Serializable {
 	}
 
 	/**
-	 * Gets the Flink (internal) accumulators values.
-	 * @return the serialized map
-	 */
-	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> deserializeFlinkAccumulators() throws IOException, ClassNotFoundException {
-		return flinkAccumulators.deserializeValue(getClass().getClassLoader());
-	}
-
-	/**
 	 * Gets the user-defined accumulators values.
 	 * @return the serialized map
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index aefc17d..df558c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -17,14 +17,10 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-import java.util.Map;
-
 /**
  * Common interface for the runtime {@link Execution and {@link ArchivedExecution}.
  */
@@ -88,18 +84,11 @@ public interface AccessExecution {
 	StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
 
 	/**
-	 * Returns the system-defined accumulators.
-	 *
-	 * @return system-defined accumulators.
-	 * @deprecated Will be removed in FLINK-4527
-	 */
-	@Deprecated
-	Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators();
-
-	/**
 	 * Returns the subtask index of this execution.
 	 *
 	 * @return subtask index of this execution.
 	 */
 	int getParallelSubtaskIndex();
+
+	IOMetrics getIOMetrics();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 0fd97da..e7fe1b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -18,8 +18,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
@@ -153,15 +151,6 @@ public interface AccessExecutionGraph {
 	Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException;
 
 	/**
-	 * Returns the aggregated system-defined accumulators.
-	 *
-	 * @return aggregated system-defined accumulators.
-	 * @deprecated Will be removed in FLINK-4527
-	 */
-	@Deprecated
-	Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators();
-
-	/**
 	 * Returns whether this execution graph was archived.
 	 *
 	 * @return true, if the execution graph was archived, false otherwise

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
index c9bf604..92af0c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -17,16 +17,12 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import scala.Option;
 
-import java.util.Map;
-
 /**
  * Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}.
  */
@@ -81,15 +77,6 @@ public interface AccessExecutionJobVertex {
 	Option<OperatorCheckpointStats> getCheckpointStats();
 
 	/**
-	 * Returns the aggregated system-defined accumulators.
-	 *
-	 * @return aggregated system-defined accumulators.
-	 * @deprecated Will be removed in FLINK-4527
-	 */
-	@Deprecated
-	Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators();
-
-	/**
 	 * Returns the aggregated user-defined accumulators as strings.
 	 *
 	 * @return aggregated user-defined accumulators as strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 0b2992f..c189d42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.Serializable;
-import java.util.Map;
 
 public class ArchivedExecution implements AccessExecution, Serializable {
 	private static final long serialVersionUID = 4817108757483345173L;
@@ -46,13 +43,12 @@ public class ArchivedExecution implements AccessExecution, Serializable {
 	/* Continuously updated map of user-defined accumulators */
 	private final StringifiedAccumulatorResult[] userAccumulators;
 
-	/* Continuously updated map of internal accumulators */
-	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
 	private final int parallelSubtaskIndex;
 
+	private final IOMetrics ioMetrics;
+
 	public ArchivedExecution(Execution execution) {
 		this.userAccumulators = execution.getUserAccumulatorsStringified();
-		this.flinkAccumulators = execution.getFlinkAccumulators();
 		this.attemptId = execution.getAttemptId();
 		this.attemptNumber = execution.getAttemptNumber();
 		this.stateTimestamps = execution.getStateTimestamps();
@@ -60,6 +56,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
 		this.state = execution.getState();
 		this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause());
 		this.assignedResourceLocation = execution.getAssignedResourceLocation();
+		this.ioMetrics = execution.getIOMetrics();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -106,13 +103,14 @@ public class ArchivedExecution implements AccessExecution, Serializable {
 		return userAccumulators;
 	}
 
-	@Override
-	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
-		return flinkAccumulators;
-	}
 
 	@Override
 	public int getParallelSubtaskIndex() {
 		return parallelSubtaskIndex;
 	}
+
+	@Override
+	public IOMetrics getIOMetrics() {
+		return ioMetrics;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index d8c58c8..0bd5319 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -19,8 +19,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -31,7 +29,6 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -208,24 +205,6 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 		return tracker;
 	}
 
-	/**
-	 * Gets the internal flink accumulator map of maps which contains some metrics.
-	 *
-	 * @return A map of accumulators for every executed task.
-	 */
-	@Override
-	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() {
-		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
-			new HashMap<>();
-
-		for (AccessExecutionVertex vertex : getAllExecutionVertices()) {
-			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-			flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs);
-		}
-
-		return flinkAccumulators;
-	}
-
 	@Override
 	public boolean isArchived() {
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index 4857bf5..8ae6bbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
@@ -46,7 +45,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 
 	private final int maxParallelism;
 
-	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> aggregatedMetricAccumulators;
 	private final Option<OperatorCheckpointStats> checkpointStats;
 	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
 
@@ -56,8 +54,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 			taskVertices[x] = jobVertex.getTaskVertices()[x].archive();
 		}
 
-		aggregatedMetricAccumulators = jobVertex.getAggregatedMetricAccumulators();
-
 		Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>();
 		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -116,10 +112,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 		return getAggregateJobVertexState(num, parallelism);
 	}
 
-	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
-		return this.aggregatedMetricAccumulators;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Static / pre-assigned input splits
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 17e0df1..788dee4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,7 +23,6 @@ import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -143,9 +142,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	/* Continuously updated map of user-defined accumulators */
 	private volatile Map<String, Accumulator<?, ?>> userAccumulators;
-
-	/* Continuously updated map of internal accumulators */
-	private volatile Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+	private IOMetrics ioMetrics;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -651,7 +648,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		markFinished(null, null);
 	}
 
-	void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) {
+	void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
 
 		// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
 		while (true) {
@@ -673,9 +670,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						}
 
 						synchronized (accumulatorLock) {
-							this.flinkAccumulators = flinkAccumulators;
 							this.userAccumulators = userAccumulators;
 						}
+						this.ioMetrics = metrics;
 
 						assignedResource.releaseSlot();
 						vertex.getExecutionGraph().deregisterExecution(this);
@@ -1010,14 +1007,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	/**
 	 * Update accumulators (discarded when the Execution has already been terminated).
-	 * @param flinkAccumulators the flink internal accumulators
 	 * @param userAccumulators the user accumulators
 	 */
-	public void setAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
-								Map<String, Accumulator<?, ?>> userAccumulators) {
+	public void setAccumulators(Map<String, Accumulator<?, ?>> userAccumulators) {
 		synchronized (accumulatorLock) {
 			if (!state.isTerminal()) {
-				this.flinkAccumulators = flinkAccumulators;
 				this.userAccumulators = userAccumulators;
 			}
 		}
@@ -1033,14 +1027,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	@Override
-	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
-		return flinkAccumulators;
-	}
-
-	@Override
 	public int getParallelSubtaskIndex() {
 		return getVertex().getParallelSubtaskIndex();
 	}
+		
+	@Override
+	public IOMetrics getIOMetrics() {
+		return ioMetrics;
+	}
 
 	// ------------------------------------------------------------------------
 	//  Standard utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0a79cf2..074a04d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,7 +29,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -600,23 +599,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	/**
-	 * Gets the internal flink accumulator map of maps which contains some metrics.
-	 * @return A map of accumulators for every executed task.
-	 */
-	@Override
-	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() {
-		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
-				new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
-
-		for (ExecutionVertex vertex : getAllExecutionVertices()) {
-			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-			flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs);
-		}
-
-		return flinkAccumulators;
-	}
-
-	/**
 	 * Merges all accumulator results from the tasks previously executed in the Executions.
 	 * @return The accumulator map
 	 */
@@ -1075,7 +1057,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/**
 	 * Updates the state of one of the ExecutionVertex's Execution attempts.
-	 * If the new status if "FINISHED", this also updates the
+	 * If the new status if "FINISHED", this also updates the accumulators.
 	 * 
 	 * @param state The state update.
 	 * @return True, if the task update was properly applied, false, if the execution attempt was not found.
@@ -1090,11 +1072,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				case FINISHED:
 					try {
 						AccumulatorSnapshot accumulators = state.getAccumulators();
-						Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators =
-							accumulators.deserializeFlinkAccumulators();
 						Map<String, Accumulator<?, ?>> userAccumulators =
 							accumulators.deserializeUserAccumulators(userClassLoader);
-						attempt.markFinished(flinkAccumulators, userAccumulators);
+						attempt.markFinished(userAccumulators, state.getIOMetrics());
 					}
 					catch (Exception e) {
 						LOG.error("Failed to deserialize final accumulator results.", e);
@@ -1160,16 +1140,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * @param accumulatorSnapshot The serialized flink and user-defined accumulators
 	 */
 	public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
-		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
 		Map<String, Accumulator<?, ?>> userAccumulators;
 		try {
-			flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
 			userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
 
 			ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
 			Execution execution = currentExecutions.get(execID);
 			if (execution != null) {
-				execution.setAccumulators(flinkAccumulators, userAccumulators);
+				execution.setAccumulators(userAccumulators);
 			} else {
 				LOG.warn("Received accumulator result for unknown execution {}.", execID);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e7f16a2..2d9ec88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,13 +20,11 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
@@ -443,37 +441,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	// --------------------------------------------------------------------------------------------
 	//  Accumulators / Metrics
 	// --------------------------------------------------------------------------------------------
-	
-	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
-		// some specialized code to speed things up
-		long bytesRead = 0;
-		long bytesWritten = 0;
-		long recordsRead = 0;
-		long recordsWritten = 0;
-		
-		for (ExecutionVertex v : getTaskVertices()) {
-			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = v.getCurrentExecutionAttempt().getFlinkAccumulators();
-			
-			if (metrics != null) {
-				LongCounter br = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-				LongCounter bw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-				LongCounter rr = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-				LongCounter rw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-				
-				bytesRead += br != null ? br.getLocalValuePrimitive() : 0;
-				bytesWritten += bw != null ? bw.getLocalValuePrimitive() : 0;
-				recordsRead += rr != null ? rr.getLocalValuePrimitive() : 0;
-				recordsWritten += rw != null ? rw.getLocalValuePrimitive() : 0;
-			}
-		}
-
-		HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>> agg = new HashMap<>();
-		agg.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(bytesRead));
-		agg.put(AccumulatorRegistry.Metric.NUM_BYTES_OUT, new LongCounter(bytesWritten));
-		agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_IN, new LongCounter(recordsRead));
-		agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_OUT, new LongCounter(recordsWritten));
-		return agg;
-	}
 
 	public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
 		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e0fe355..c5aeef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -123,11 +122,4 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 			}
 		}
 	}
-
-	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.setReporter(reporter);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index debb352..ca59609 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -48,9 +47,4 @@ public final class BufferReader extends AbstractReader {
 			}
 		}
 	}
-
-	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index a1d705f..0cc77f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
 
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
@@ -52,9 +51,4 @@ public interface ReaderBase {
 
 	boolean hasReachedEndOfSuperstep();
 
-	/**
-	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
-	 */
-	void setReporter(AccumulatorRegistry.Reporter reporter);
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index cdd8731..8f2c8fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -46,8 +45,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 	private Buffer currentBuffer;
 
-	private AccumulatorRegistry.Reporter reporter;
-
 	public AdaptiveSpanningRecordDeserializer() {
 		this.nonSpanningWrapper = new NonSpanningWrapper();
 		this.spanningWrapper = new SpanningWrapper();
@@ -93,18 +90,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		if (nonSpanningRemaining >= 4) {
 			int len = this.nonSpanningWrapper.readInt();
 
-			if (reporter != null) {
-				reporter.reportNumBytesIn(len);
-			}
-
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
 				target.read(this.nonSpanningWrapper);
 
-				if (reporter != null) {
-					reporter.reportNumRecordsIn(1);
-				}
-
 				return (this.nonSpanningWrapper.remaining() == 0) ?
 						DeserializationResult.LAST_RECORD_FROM_BUFFER :
 						DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -128,10 +117,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 			// get the full record
 			target.read(this.spanningWrapper);
 
-			if (reporter != null) {
-				reporter.reportNumRecordsIn(1);
-			}
-
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
 			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
@@ -159,12 +144,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
-	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		this.reporter = reporter;
-		this.spanningWrapper.setReporter(reporter);
-	}
-
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private static final class NonSpanningWrapper implements DataInputView {
@@ -447,8 +426,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 		private int recordLimit;
 
-		private AccumulatorRegistry.Reporter reporter;
-
 		public SpanningWrapper() {
 			this.lengthBuffer = ByteBuffer.allocate(4);
 			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
@@ -486,10 +463,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 				} else {
 					this.recordLength = this.lengthBuffer.getInt(0);
 
-					if (reporter != null) {
-						reporter.reportNumBytesIn(this.recordLength);
-					}
-
 					this.lengthBuffer.clear();
 					segmentPosition = toPut;
 				}
@@ -634,9 +607,5 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		public int read(byte[] b) throws IOException {
 			return this.serializationReadBuffer.read(b);
 		}
-
-		public void setReporter(AccumulatorRegistry.Reporter reporter) {
-			this.reporter = reporter;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index e4c7890..dd8ea06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -65,9 +64,4 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
 	void clear();
 	
 	boolean hasUnfinishedData();
-
-	/**
-	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
-	 */
-	void setReporter(AccumulatorRegistry.Reporter reporter);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index c76dd00..e8179dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -67,11 +66,6 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	boolean hasData();
 
 	/**
-	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
-	 */
-	void setReporter(AccumulatorRegistry.Reporter reporter);
-
-	/**
 	 * Insantiates all metrics.
 	 *
 	 * @param metrics metric group

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 7c4d937..e36a16f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -26,7 +26,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
@@ -53,8 +52,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	/** Limit of current {@link MemorySegment} of target buffer */
 	private int limit;
 
-	private AccumulatorRegistry.Reporter reporter;
-
 	private transient Counter numBytesOut;
 
 	public SpanningRecordSerializer() {
@@ -84,11 +81,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 		int len = this.serializationBuffer.length();
 		this.lengthBuffer.putInt(0, len);
-
-		if (reporter != null) {
-			reporter.reportNumBytesOut(len);
-			reporter.reportNumRecordsOut(1);
-		}
 		
 		if (numBytesOut != null) {
 			numBytesOut.inc(len);
@@ -192,11 +184,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	}
 
 	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		this.reporter = reporter;
-	}
-
-	@Override
 	public void instantiateMetrics(TaskIOMetricGroup metrics) {
 		numBytesOut = metrics.getNumBytesOutCounter();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index eab8e7c..7c213b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
@@ -59,8 +58,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	private Buffer currentBuffer;
 
-	private AccumulatorRegistry.Reporter reporter;
-
 	public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) {
 		this.nonSpanningWrapper = new NonSpanningWrapper();
 		this.spanningWrapper = new SpanningWrapper(tmpDirectories);
@@ -106,19 +103,11 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		if (nonSpanningRemaining >= 4) {
 			int len = this.nonSpanningWrapper.readInt();
 
-			if (reporter != null) {
-				reporter.reportNumBytesIn(len);
-			}
-
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
 				try {
 					target.read(this.nonSpanningWrapper);
 
-					if (reporter != null) {
-						reporter.reportNumRecordsIn(1);
-					}
-
 					int remaining = this.nonSpanningWrapper.remaining();
 					if (remaining > 0) {
 						return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -153,10 +142,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		if (this.spanningWrapper.hasFullRecord()) {
 			// get the full record
 			target.read(this.spanningWrapper.getInputView());
-
-			if (reporter != null) {
-				reporter.reportNumRecordsIn(1);
-			}
 			
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
@@ -182,12 +167,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
-	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		this.reporter = reporter;
-		this.spanningWrapper.setReporter(reporter);
-	}
-
 
 	// -----------------------------------------------------------------------------------------------------------------
 	
@@ -483,8 +462,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		
 		private DataInputViewStreamWrapper spillFileReader;
 
-		private AccumulatorRegistry.Reporter reporter;
-
 		public SpanningWrapper(String[] tempDirs) {
 			this.tempDirs = tempDirs;
 			
@@ -538,10 +515,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				} else {
 					this.recordLength = this.lengthBuffer.getInt(0);
 
-					if (reporter != null) {
-						reporter.reportNumBytesIn(recordLength);
-					}
-
 					this.lengthBuffer.clear();
 					segmentPosition = toPut;
 					
@@ -672,9 +645,5 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			random.nextBytes(bytes);
 			return StringUtils.byteToHexString(bytes);
 		}
-
-		public void setReporter(AccumulatorRegistry.Reporter reporter) {
-			this.reporter = reporter;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 96eea23..1e224c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
@@ -201,15 +200,6 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	/**
-	 * Counter for the number of records emitted and the records processed.
-	 */
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		for(RecordSerializer<?> serializer : serializers) {
-			serializer.setReporter(reporter);
-		}
-	}
-
-	/**
 	 * Sets the metric group for this RecordWriter.
 	 * @param metrics
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 4bc4532..2e3285c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.operators.Driver;
@@ -114,9 +113,8 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 		List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
 		final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
 		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-		AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
 		this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
-				userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
+				userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
 
 		// sanity check the setup
 		final int writersIntoStepFunction = this.eventualOutputs.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e896639..f748079 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -654,9 +653,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 		int currentReaderOffset = 0;
 
-		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-
 		for (int i = 0; i < numInputs; i++) {
 			//  ---------------- create the input readers ---------------------
 			// in case where a logical input unions multiple physical inputs, create a union reader
@@ -680,8 +676,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 				throw new Exception("Illegal input group size in task configuration: " + groupSize);
 			}
 
-			inputReaders[i].setReporter(reporter);
-
 			currentReaderOffset += groupSize;
 		}
 		this.inputReaders = inputReaders;
@@ -1015,13 +1009,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
-		AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-		AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
-
-		this.accumulatorMap = accumulatorRegistry.getUserMap();
+		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 
 		this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs,
-				this.getExecutionConfig(), reporter, this.accumulatorMap);
+				this.getExecutionConfig(), this.accumulatorMap);
 	}
 
 	public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
@@ -1215,7 +1206,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 	 * @return The OutputCollector that data produced in this task is submitted to.
 	 */
 	public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl,
-			List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception
+			List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception
 	{
 		if (numOutputs == 0) {
 			return null;
@@ -1248,8 +1239,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 			final RecordWriter<SerializationDelegate<T>> recordWriter =
 					new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
 
-			// setup live accumulator counters
-			recordWriter.setReporter(reporter);
 			recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup());
 
 			writers.add(recordWriter);
@@ -1269,7 +1258,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 										List<ChainedDriver<?, ?>> chainedTasksTarget,
 										List<RecordWriter<?>> eventualOutputs,
 										ExecutionConfig executionConfig,
-										AccumulatorRegistry.Reporter reporter,
 										Map<String, Accumulator<?,?>> accumulatorMap)
 	throws Exception
 	{
@@ -1304,7 +1292,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 				if (i == numChained - 1) {
 					// last in chain, instantiate the output collector for this task
-					previous = getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter);
+					previous = getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs());
 				}
 
 				ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap);
@@ -1322,7 +1310,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 		// else
 
 		// instantiate the output collector the default way from this configuration
-		return getOutputCollector(containingTask , config, cl, eventualOutputs, 0, numOutputs, reporter);
+		return getOutputCollector(containingTask , config, cl, eventualOutputs, 0, numOutputs);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index eb7999c..bd052f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
@@ -354,11 +353,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		} else {
 			throw new Exception("Illegal input group size in task configuration: " + groupSize);
 		}
-
-		final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-		final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
-
-		inputReader.setReporter(reporter);
 		
 		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		@SuppressWarnings({ "rawtypes" })