You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/16 10:34:20 UTC

[1/2] flink git commit: [FLINK-3949] Collect Metrics in Runtime Operators

Repository: flink
Updated Branches:
  refs/heads/master 104958523 -> c78b3c49e


http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index f51cb68..86be7b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -76,6 +76,7 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void collect(IT record) {
 		try {
+			this.numRecordsIn.inc();
 			this.mapper.flatMap(record, this.outputCollector);
 		} catch (Exception ex) {
 			throw new ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index 9b888f2..cef1b73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -75,6 +75,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void collect(IT record) {
 		try {
+			this.numRecordsIn.inc();
 			this.outputCollector.collect(this.mapper.map(record));
 		} catch (Exception ex) {
 			throw new ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
index 3912b98..e3de1c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
@@ -59,6 +59,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT,
 
 	@Override
 	public void collect(IT record) {
+		numRecordsIn.inc();
 		agg.aggregate(1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 63f2b20..e6c8c2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -167,6 +167,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 	@Override
 	public void collect(IN record) {
+		numRecordsIn.inc();
 		// try writing to the sorter first
 		try {
 			if (this.sorter.write(record)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 408abc2..a003d9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -163,6 +163,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 	@Override
 	public void collect(IN record) {
+		this.numRecordsIn.inc();
 		// try writing to the sorter first
 		try {
 			if (this.sorter.write(record)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
new file mode 100644
index 0000000..f7a1df9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Collector;
+
+public class CountingCollector<OUT> implements Collector<OUT> {
+	private final Collector<OUT> collector;
+	private final Counter numRecordsOut;
+
+	public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
+		this.collector = collector;
+		this.numRecordsOut = numRecordsOut;
+	}
+
+	@Override
+	public void collect(OUT record) {
+		this.numRecordsOut.inc();
+		this.collector.collect(record);
+	}
+
+	@Override
+	public void close() {
+		this.collector.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
new file mode 100644
index 0000000..7494108
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.Iterator;
+
+public class CountingIterable<IN> implements Iterable<IN> {
+
+	private final Iterable<IN> iterable;
+	private final Counter numRecordsIn;
+
+	public CountingIterable(Iterable<IN> iterable, Counter numRecordsIn) {
+		this.iterable = iterable;
+		this.numRecordsIn = numRecordsIn;
+	}
+
+	@Override
+	public Iterator<IN> iterator() {
+		return new CountingIterator<>(this.iterable.iterator(), this.numRecordsIn);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
new file mode 100644
index 0000000..fe89358
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingIterator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.Iterator;
+
+public class CountingIterator<IN> implements Iterator<IN> {
+	private final Iterator<IN> iterator;
+	private final Counter numRecordsIn;
+
+	public CountingIterator(Iterator<IN> iterator, Counter numRecordsIn) {
+		this.iterator = iterator;
+		this.numRecordsIn = numRecordsIn;
+	}
+
+	@Override
+	public boolean hasNext() {
+		return this.iterator.hasNext();
+	}
+
+	@Override
+	public IN next() {
+		numRecordsIn.inc();
+		return this.iterator.next();
+	}
+
+	@Override
+	public void remove() {
+		this.iterator.remove();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
new file mode 100644
index 0000000..e4b436a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/metrics/CountingMutableObjectIterator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+
+public class CountingMutableObjectIterator<IN> implements MutableObjectIterator<IN> {
+	private final MutableObjectIterator<IN> iterator;
+	private final Counter numRecordsIn;
+
+	public CountingMutableObjectIterator(MutableObjectIterator<IN> iterator, Counter numRecordsIn) {
+		this.iterator = iterator;
+		this.numRecordsIn = numRecordsIn;
+	}
+
+	@Override
+	public IN next(IN reuse) throws IOException {
+		IN next = iterator.next(reuse);
+		if (next != null) {
+			numRecordsIn.inc();
+		}
+		return next;
+	}
+
+	@Override
+	public IN next() throws IOException {
+		IN next = iterator.next();
+		if (next != null) {
+			numRecordsIn.inc();
+		}
+		return next;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 017b16b..58eb90c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -319,7 +319,8 @@ public class Task implements Runnable {
 
 		for (int i = 0; i < this.inputGates.length; i++) {
 			SingleInputGate gate = SingleInputGate.create(
-					taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);
+					taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment, 
+					metricGroup.getIOMetricGroup());
 
 			this.inputGates[i] = gate;
 			inputGatesById.put(gate.getConsumedResultId(), gate);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 9724a80..6853722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -190,9 +189,5 @@ public class AbstractReaderTest {
 		public void setReporter(AccumulatorRegistry.Reporter reporter) {
 
 		}
-
-		@Override
-		public void setMetricGroup(IOMetricGroup metrics) {
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 9717530..da15f08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -118,7 +119,7 @@ public class InputChannelTest {
 				ResultPartitionID partitionId,
 				Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
+			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8852b4f..f91a4ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -271,7 +272,8 @@ public class LocalInputChannelTest {
 				new ResultPartitionID(),
 				partitionManager,
 				mock(TaskEventDispatcher.class),
-				initialAndMaxRequestBackoff);
+				initialAndMaxRequestBackoff,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 	}
 
 	/**
@@ -346,7 +348,8 @@ public class LocalInputChannelTest {
 					new IntermediateDataSetID(),
 					subpartitionIndex,
 					numberOfInputChannels,
-					mock(PartitionStateChecker.class));
+					mock(PartitionStateChecker.class),
+					new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);
@@ -360,7 +363,8 @@ public class LocalInputChannelTest {
 								i,
 								consumedPartitionIds[i],
 								partitionManager,
-								taskEventDispatcher));
+								taskEventDispatcher,
+								new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()));
 			}
 
 			this.numberOfInputChannels = numberOfInputChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index c484cc4..9eb49ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import scala.Tuple2;
 
@@ -247,7 +248,8 @@ public class RemoteInputChannelTest {
 				0,
 				partitionId,
 				mock(ConnectionID.class),
-				connectionManager);
+				connectionManager,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ch.onFailedPartitionRequest();
 
@@ -266,7 +268,8 @@ public class RemoteInputChannelTest {
 				0,
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
-				connManager);
+				connManager,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
 
@@ -301,6 +304,7 @@ public class RemoteInputChannelTest {
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
 				connectionManager,
-				initialAndMaxRequestBackoff);
+				initialAndMaxRequestBackoff,
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index c4bb785..05427a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import scala.Tuple2;
 
@@ -66,7 +67,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class));
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -113,7 +114,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class));
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -122,12 +123,12 @@ public class SingleInputGateTest {
 		// Local
 		ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher);
+		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0));
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		// Set channels
 		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -168,7 +169,7 @@ public class SingleInputGateTest {
 				new IntermediateDataSetID(),
 				0,
 				1,
-				mock(PartitionStateChecker.class));
+				mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -179,7 +180,7 @@ public class SingleInputGateTest {
 				partitionManager,
 				new TaskEventDispatcher(),
 				new LocalConnectionManager(),
-				new Tuple2<>(0, 0));
+				new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
@@ -208,7 +209,8 @@ public class SingleInputGateTest {
 				new IntermediateDataSetID(),
 				0,
 				1,
-				mock(PartitionStateChecker.class));
+				mock(PartitionStateChecker.class),
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
 				inputGate,
@@ -217,7 +219,8 @@ public class SingleInputGateTest {
 				new ResultPartitionManager(),
 				new TaskEventDispatcher(),
 				new LocalConnectionManager(),
-				new Tuple2<>(0, 0));
+				new Tuple2<Integer, Integer>(0, 0),
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 640c11a..607da94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -59,7 +60,7 @@ public class TestSingleInputGate {
 		checkArgument(numberOfInputChannels >= 1);
 
 		SingleInputGate realGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class));
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index d8714d1..28f621f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -43,8 +44,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class));
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class));
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index a2edce2..6d3f768 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.metrics.groups.JobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
@@ -65,4 +66,19 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 			super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
 		}
 	}
+	
+	public static class DummyIOMetricGroup extends IOMetricGroup {
+		public DummyIOMetricGroup() {
+			super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup());
+		}
+
+		@Override
+		protected void addMetric(String name, Metric metric) {
+		}
+
+		@Override
+		public MetricGroup addGroup(String name) {
+			return new UnregisteredMetricsGroup();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 326a42f..dc7bbdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -25,10 +25,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -102,10 +104,10 @@ public abstract class AbstractStreamOperator<OUT>
 	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
 		this.container = containingTask;
 		this.config = config;
-		this.output = output;
 		String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
 		
 		this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
+		this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
 		this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
 
 		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
@@ -334,4 +336,30 @@ public abstract class AbstractStreamOperator<OUT>
 	public void disableInputCopy() {
 		this.inputCopyDisabled = true;
 	}
+
+	public class CountingOutput implements Output<StreamRecord<OUT>> {
+		private final Output<StreamRecord<OUT>> output;
+		private final Counter numRecordsOut;
+
+		public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
+			this.output = output;
+			this.numRecordsOut = counter;
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void collect(StreamRecord<OUT> record) {
+			numRecordsOut.inc();
+			output.collect(record);
+		}
+
+		@Override
+		public void close() {
+			output.close();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 84f59ed..68c623e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -51,6 +51,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
+	public void run(final Object lockingObject) throws Exception {
+		run(lockingObject, output);
+	}
+
 	
 	public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index ab69ab7..33a0407 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -81,6 +83,8 @@ public class StreamInputProcessor<IN> {
 
 	private final DeserializationDelegate<StreamElement> deserializationDelegate;
 
+	private Counter numRecordsIn;
+
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
 								EventListener<CheckpointBarrier> checkpointListener,
@@ -133,6 +137,9 @@ public class StreamInputProcessor<IN> {
 		if (isFinished) {
 			return false;
 		}
+		if (numRecordsIn == null) {
+			numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn");
+		}
 
 		while (true) {
 			if (currentRecordDeserializer != null) {
@@ -166,6 +173,7 @@ public class StreamInputProcessor<IN> {
 						// now we can do the actual processing
 						StreamRecord<IN> record = recordOrWatermark.asRecord();
 						synchronized (lock) {
+							numRecordsIn.inc();
 							streamOperator.setKeyContextElement1(record);
 							streamOperator.processElement(record);
 						}
@@ -211,9 +219,12 @@ public class StreamInputProcessor<IN> {
 	 * @param metrics metric group
      */
 	public void setMetricGroup(IOMetricGroup metrics) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.instantiateMetrics(metrics);
-		}
+		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return lastEmittedWatermark;
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 733e7fb..1a66934 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -287,9 +288,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	 * @param metrics metric group
 	 */
 	public void setMetricGroup(IOMetricGroup metrics) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.instantiateMetrics(metrics);
-		}
+		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return Math.min(lastEmittedWatermark1, lastEmittedWatermark2);
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 90abea4..761aa37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -298,14 +299,17 @@ public class OperatorChain<OUT> {
 	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
 		
 		protected final OneInputStreamOperator<T, ?> operator;
+		protected final Counter numRecordsIn;
 
 		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
 			this.operator = operator;
+			this.numRecordsIn = operator.getMetricGroup().counter("numRecordsIn");
 		}
 
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
+				numRecordsIn.inc();
 				operator.setKeyContextElement1(record);
 				operator.processElement(record);
 			}
@@ -347,6 +351,7 @@ public class OperatorChain<OUT> {
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
+				numRecordsIn.inc();
 				StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index af9278f..7ae99f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	@Override
 	protected void run() throws Exception {
-		headOperator.run(getCheckpointLock(), getHeadOutput());
+		headOperator.run(getCheckpointLock());
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index af7d3f9..58e3cb8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -56,13 +57,27 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		
 		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
 		
-		this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
+		this.headOperator = new RecordPusher<>();
+		this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
 	}
 
 	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
 		
 		private static final long serialVersionUID = 1L;
 
+		@Override
+		public void processElement(StreamRecord<IN> record) throws Exception {
+			output.collect(record);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) {
+			// ignore
+		}
+	}
+
+	private static class IterationTailOutput<IN> implements Output<StreamRecord<IN>> {
+
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final BlockingQueue<StreamRecord<IN>> dataChannel;
 		
@@ -70,25 +85,32 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		
 		private final boolean shouldWait;
 
-		RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
+		IterationTailOutput(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
 			this.dataChannel = dataChannel;
 			this.iterationWaitTime = iterationWaitTime;
 			this.shouldWait =  iterationWaitTime > 0;
 		}
 
 		@Override
-		public void processElement(StreamRecord<IN> record) throws Exception {
-			if (shouldWait) {
-				dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			}
-			else {
-				dataChannel.put(record);
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public void collect(StreamRecord<IN> record) {
+			try {
+				if (shouldWait) {
+					dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+				}
+				else {
+					dataChannel.put(record);
+				}
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
 			}
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) {
-			// ignore
+		public void close() {
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a771c85..444245c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -154,6 +155,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	private long recoveryTimestamp;
 
+	private long lastCheckpointSize = 0;
+
 	// ------------------------------------------------------------------------
 	//  Life cycle methods for specific implementations
 	// ------------------------------------------------------------------------
@@ -194,6 +197,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			// allow trigger tasks to be removed if all timers for that timestamp are removed by user
 			timerService.setRemoveOnCancelPolicy(true);
 
+			getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
+				@Override
+				public Long getValue() {
+					return StreamTask.this.lastCheckpointSize;
+				}
+			});
+
 			// task specific initialization
 			init();
 			
@@ -538,6 +548,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				if (allStates.isEmpty()) {
 					getEnvironment().acknowledgeCheckpoint(checkpointId);
 				} else if (!hasAsyncStates) {
+					this.lastCheckpointSize = allStates.getStateSize();
 					getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 				} else {
 					// start a Thread that does the asynchronous materialization and
@@ -572,6 +583,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 									}
 								}
 								StreamTaskStateList allStates = new StreamTaskStateList(states);
+								StreamTask.this.lastCheckpointSize = allStates.getStateSize();
 								getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 								LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
 							}


[2/2] flink git commit: [FLINK-3949] Collect Metrics in Runtime Operators

Posted by ch...@apache.org.
[FLINK-3949] Collect Metrics in Runtime Operators

currentLowWatermark
lastCheckpointSize
numBytes(In/Out)(Local/Remote)
numRecords(In/Out)
numSplitsProcessed (Batch only)

This closes #2090


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c78b3c49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c78b3c49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c78b3c49

Branch: refs/heads/master
Commit: c78b3c49e0e82874cbfa71e88bf28b99ed395610
Parents: 1049585
Author: zentol <ch...@apache.org>
Authored: Wed Jun 1 11:24:40 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 16 12:24:57 2016 +0200

----------------------------------------------------------------------
 .../flink/metrics/groups/IOMetricGroup.java     | 22 +++------
 .../api/reader/AbstractRecordReader.java        |  8 ---
 .../io/network/api/reader/BufferReader.java     |  5 --
 .../io/network/api/reader/ReaderBase.java       |  8 ---
 .../AdaptiveSpanningRecordDeserializer.java     | 20 --------
 .../api/serialization/RecordDeserializer.java   |  8 ---
 .../serialization/SpanningRecordSerializer.java |  6 ---
 ...llingAdaptiveSpanningRecordDeserializer.java | 20 --------
 .../partition/consumer/InputChannel.java        |  8 ++-
 .../partition/consumer/LocalInputChannel.java   | 12 +++--
 .../partition/consumer/RemoteInputChannel.java  | 12 +++--
 .../partition/consumer/SingleInputGate.java     | 20 +++++---
 .../partition/consumer/UnknownInputChannel.java | 13 +++--
 .../AbstractCachedBuildSideJoinDriver.java      | 12 +++--
 .../operators/AbstractOuterJoinDriver.java      | 14 ++++--
 .../operators/AllGroupCombineDriver.java        | 10 +++-
 .../runtime/operators/AllReduceDriver.java      | 14 +++++-
 .../flink/runtime/operators/BatchTask.java      |  1 -
 .../flink/runtime/operators/CoGroupDriver.java  | 13 +++--
 .../flink/runtime/operators/CrossDriver.java    | 46 ++++++++++++------
 .../flink/runtime/operators/DataSinkTask.java   | 11 +++--
 .../flink/runtime/operators/DataSourceTask.java | 12 ++++-
 .../flink/runtime/operators/FlatMapDriver.java  |  8 ++-
 .../runtime/operators/GroupReduceDriver.java    | 10 +++-
 .../flink/runtime/operators/JoinDriver.java     | 12 +++--
 .../flink/runtime/operators/MapDriver.java      |  8 ++-
 .../runtime/operators/MapPartitionDriver.java   |  9 +++-
 .../runtime/operators/NoOpChainedDriver.java    |  1 +
 .../flink/runtime/operators/NoOpDriver.java     |  8 ++-
 .../runtime/operators/ReduceCombineDriver.java  | 10 +++-
 .../flink/runtime/operators/ReduceDriver.java   | 11 ++++-
 .../operators/UnionWithTempOperator.java        |  8 ++-
 .../chaining/ChainedAllReduceDriver.java        |  1 +
 .../operators/chaining/ChainedDriver.java       | 12 ++++-
 .../chaining/ChainedFlatMapDriver.java          |  1 +
 .../operators/chaining/ChainedMapDriver.java    |  1 +
 .../ChainedTerminationCriterionDriver.java      |  1 +
 .../chaining/GroupCombineChainedDriver.java     |  1 +
 .../SynchronousChainedCombineDriver.java        |  1 +
 .../util/metrics/CountingCollector.java         | 42 ++++++++++++++++
 .../util/metrics/CountingIterable.java          | 38 +++++++++++++++
 .../util/metrics/CountingIterator.java          | 48 ++++++++++++++++++
 .../metrics/CountingMutableObjectIterator.java  | 51 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 .../network/api/reader/AbstractReaderTest.java  |  5 --
 .../partition/consumer/InputChannelTest.java    |  3 +-
 .../consumer/LocalInputChannelTest.java         | 10 ++--
 .../consumer/RemoteInputChannelTest.java        | 10 ++--
 .../partition/consumer/SingleInputGateTest.java | 19 +++++---
 .../partition/consumer/TestSingleInputGate.java |  3 +-
 .../partition/consumer/UnionInputGateTest.java  |  5 +-
 .../testutils/UnregisteredTaskMetricsGroup.java | 16 ++++++
 .../api/operators/AbstractStreamOperator.java   | 30 +++++++++++-
 .../streaming/api/operators/StreamSource.java   |  4 ++
 .../runtime/io/StreamInputProcessor.java        | 17 +++++--
 .../runtime/io/StreamTwoInputProcessor.java     | 10 ++--
 .../streaming/runtime/tasks/OperatorChain.java  |  5 ++
 .../runtime/tasks/SourceStreamTask.java         |  2 +-
 .../runtime/tasks/StreamIterationTail.java      | 42 ++++++++++++----
 .../streaming/runtime/tasks/StreamTask.java     | 12 +++++
 60 files changed, 573 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
index b34c844..46bf2af 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
@@ -26,33 +26,27 @@ import org.apache.flink.metrics.MetricRegistry;
  */
 public class IOMetricGroup extends AbstractMetricGroup {
 
-	private final Counter numBytesIn;
 	private final Counter numBytesOut;
-	private final Counter numRecordsIn;
-	private final Counter numRecordsOut;
+	private final Counter numBytesInLocal;
+	private final Counter numBytesInRemote;
 
 	public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
 		super(registry, parent.getScopeComponents());
-
-		this.numBytesIn = parent.counter("numBytesIn");
 		this.numBytesOut = parent.counter("numBytesOut");
-		this.numRecordsIn = parent.counter("numRecordsIn");
-		this.numRecordsOut = parent.counter("numRecordsOut");
-	}
 
-	public Counter getBytesInCounter() {
-		return numBytesIn;
+		this.numBytesInLocal = parent.counter("numBytesInLocal");
+		this.numBytesInRemote = parent.counter("numBytesInRemote");
 	}
 
 	public Counter getBytesOutCounter() {
 		return numBytesOut;
 	}
 
-	public Counter getRecordsInCounter() {
-		return numRecordsIn;
+	public Counter getNumBytesInLocalCounter() {
+		return numBytesInLocal;
 	}
 
-	public Counter getRecordsOutCounter() {
-		return numRecordsOut;
+	public Counter getNumBytesInRemoteCounter() {
+		return numBytesInRemote;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 48ac558..e0fe355 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
@@ -131,11 +130,4 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 			deserializer.setReporter(reporter);
 		}
 	}
-	
-	@Override
-	public void setMetricGroup(IOMetricGroup metrics) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.instantiateMetrics(metrics);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index e5f5930..debb352 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -54,8 +53,4 @@ public final class BufferReader extends AbstractReader {
 	public void setReporter(AccumulatorRegistry.Reporter reporter) {
 
 	}
-
-	@Override
-	public void setMetricGroup(IOMetricGroup metrics) {
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 192a9ab..a1d705f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
 
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
@@ -58,11 +57,4 @@ public interface ReaderBase {
 	 */
 	void setReporter(AccumulatorRegistry.Reporter reporter);
 
-	/**
-	 * Setter for the metric group.
-	 *
-	 * @param metrics metric group to set
-	 */
-	void setMetricGroup(IOMetricGroup metrics);
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 1c17476..cdd8731 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
@@ -49,9 +47,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 	private Buffer currentBuffer;
 
 	private AccumulatorRegistry.Reporter reporter;
-	
-	private transient Counter numRecordsIn;
-	private transient Counter numBytesIn;
 
 	public AdaptiveSpanningRecordDeserializer() {
 		this.nonSpanningWrapper = new NonSpanningWrapper();
@@ -101,9 +96,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 			if (reporter != null) {
 				reporter.reportNumBytesIn(len);
 			}
-			if (numBytesIn != null) {
-				numBytesIn.inc(len);
-			}
 
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
@@ -112,9 +104,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 				if (reporter != null) {
 					reporter.reportNumRecordsIn(1);
 				}
-				if (numRecordsIn != null) {
-					numRecordsIn.inc();
-				}
 
 				return (this.nonSpanningWrapper.remaining() == 0) ?
 						DeserializationResult.LAST_RECORD_FROM_BUFFER :
@@ -142,9 +131,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 			if (reporter != null) {
 				reporter.reportNumRecordsIn(1);
 			}
-			if (numRecordsIn != null) {
-				numRecordsIn.inc();
-			}
 
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
@@ -179,12 +165,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		this.spanningWrapper.setReporter(reporter);
 	}
 
-	@Override
-	public void instantiateMetrics(IOMetricGroup metrics) {
-		numBytesIn = metrics.getBytesInCounter();
-		numRecordsIn = metrics.getRecordsInCounter();
-	}
-
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private static final class NonSpanningWrapper implements DataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index 2f0c1ac..e4c7890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
@@ -71,11 +70,4 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
 	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
 	 */
 	void setReporter(AccumulatorRegistry.Reporter reporter);
-
-	/**
-	 * Instantiates all metrics.
-	 *
-	 * @param metrics metric group
-	 */
-	void instantiateMetrics(IOMetricGroup metrics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 6495650..b218de8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -55,7 +55,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 	private AccumulatorRegistry.Reporter reporter;
 
-	private transient Counter numRecordsOut;
 	private transient Counter numBytesOut;
 
 	public SpanningRecordSerializer() {
@@ -94,10 +93,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		if (numBytesOut != null) {
 			numBytesOut.inc(len);
 		}
-		
-		if (numRecordsOut != null) {
-			numRecordsOut.inc();
-		}
 
 		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
 
@@ -204,6 +199,5 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	@Override
 	public void instantiateMetrics(IOMetricGroup metrics) {
 		numBytesOut = metrics.getBytesOutCounter();
-		numRecordsOut = metrics.getRecordsOutCounter();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 7e96390..eab8e7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -22,8 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
@@ -63,9 +61,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	private AccumulatorRegistry.Reporter reporter;
 
-	private Counter numRecordsIn;
-	private Counter numBytesIn;
-
 	public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) {
 		this.nonSpanningWrapper = new NonSpanningWrapper();
 		this.spanningWrapper = new SpanningWrapper(tmpDirectories);
@@ -114,9 +109,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			if (reporter != null) {
 				reporter.reportNumBytesIn(len);
 			}
-			if (numBytesIn != null) {
-				numBytesIn.inc(len);
-			}
 
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
@@ -126,9 +118,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					if (reporter != null) {
 						reporter.reportNumRecordsIn(1);
 					}
-					if (numRecordsIn != null) {
-						numRecordsIn.inc();
-					}
 
 					int remaining = this.nonSpanningWrapper.remaining();
 					if (remaining > 0) {
@@ -168,9 +157,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			if (reporter != null) {
 				reporter.reportNumRecordsIn(1);
 			}
-			if (numRecordsIn != null) {
-				numRecordsIn.inc();
-			}
 			
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
@@ -202,12 +188,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		this.spanningWrapper.setReporter(reporter);
 	}
 
-	@Override
-	public void instantiateMetrics(IOMetricGroup metrics) {
-		numBytesIn = metrics.getBytesInCounter();
-		numRecordsIn = metrics.getRecordsInCounter();
-	}
-
 
 	// -----------------------------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index e6e078d..5d82903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -61,6 +62,8 @@ public abstract class InputChannel {
 	/** The maximum backoff (in ms). */
 	private final int maxBackoff;
 
+	protected final Counter numBytesIn;
+
 	/** The current backoff (in ms) */
 	private int currentBackoff;
 
@@ -68,7 +71,8 @@ public abstract class InputChannel {
 			SingleInputGate inputGate,
 			int channelIndex,
 			ResultPartitionID partitionId,
-			Tuple2<Integer, Integer> initialAndMaxBackoff) {
+			Tuple2<Integer, Integer> initialAndMaxBackoff,
+			Counter numBytesIn) {
 
 		checkArgument(channelIndex >= 0);
 
@@ -84,6 +88,8 @@ public abstract class InputChannel {
 		this.initialBackoff = initial;
 		this.maxBackoff = max;
 		this.currentBackoff = initial == 0 ? -1 : 0;
+
+		this.numBytesIn = numBytesIn;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 92b6d1f..fc35bef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -67,10 +68,11 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			int channelIndex,
 			ResultPartitionID partitionId,
 			ResultPartitionManager partitionManager,
-			TaskEventDispatcher taskEventDispatcher) {
+			TaskEventDispatcher taskEventDispatcher,
+			IOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
-				new Tuple2<Integer, Integer>(0, 0));
+				new Tuple2<Integer, Integer>(0, 0), metrics);
 	}
 
 	LocalInputChannel(
@@ -79,9 +81,10 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			ResultPartitionID partitionId,
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
-			Tuple2<Integer, Integer> initialAndMaxBackoff) {
+			Tuple2<Integer, Integer> initialAndMaxBackoff,
+			IOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
+		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInLocalCounter());
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -165,6 +168,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 		getNextLookAhead();
 
+		numBytesIn.inc(next.getSize());
 		return next;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 814e961..b316fd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -80,10 +81,11 @@ public class RemoteInputChannel extends InputChannel {
 			int channelIndex,
 			ResultPartitionID partitionId,
 			ConnectionID connectionId,
-			ConnectionManager connectionManager) {
+			ConnectionManager connectionManager,
+			IOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
-				new Tuple2<Integer, Integer>(0, 0));
+				new Tuple2<Integer, Integer>(0, 0), metrics);
 	}
 
 	public RemoteInputChannel(
@@ -92,9 +94,10 @@ public class RemoteInputChannel extends InputChannel {
 			ResultPartitionID partitionId,
 			ConnectionID connectionId,
 			ConnectionManager connectionManager,
-			Tuple2<Integer, Integer> initialAndMaxBackoff) {
+			Tuple2<Integer, Integer> initialAndMaxBackoff,
+			IOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
+		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -148,6 +151,7 @@ public class RemoteInputChannel extends InputChannel {
 				throw new IOException("Queried input channel for data although non is available.");
 			}
 
+			numBytesIn.inc(buffer.getSize());
 			return buffer;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 81d202a..90e395c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -173,7 +174,8 @@ public class SingleInputGate implements InputGate {
 			IntermediateDataSetID consumedResultId,
 			int consumedSubpartitionIndex,
 			int numberOfInputChannels,
-			PartitionStateChecker partitionStateChecker) {
+			PartitionStateChecker partitionStateChecker,
+			IOMetricGroup metrics) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
@@ -502,7 +504,8 @@ public class SingleInputGate implements InputGate {
 			JobID jobId,
 			ExecutionAttemptID executionId,
 			InputGateDeploymentDescriptor igdd,
-			NetworkEnvironment networkEnvironment) {
+			NetworkEnvironment networkEnvironment,
+			IOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
 
@@ -512,7 +515,8 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, icdd.length, networkEnvironment.getPartitionStateChecker());
+				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+				icdd.length, networkEnvironment.getPartitionStateChecker(), metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
@@ -526,13 +530,16 @@ public class SingleInputGate implements InputGate {
 				inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
 						networkEnvironment.getPartitionManager(),
 						networkEnvironment.getTaskEventDispatcher(),
-						networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
+						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
+						metrics
+				);
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
 						partitionLocation.getConnectionId(),
 						networkEnvironment.getConnectionManager(),
-						networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
+						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
+						metrics
 				);
 			}
 			else if (partitionLocation.isUnknown()) {
@@ -540,7 +547,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionManager(),
 						networkEnvironment.getTaskEventDispatcher(),
 						networkEnvironment.getConnectionManager(),
-						networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
+						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
+						metrics
 				);
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 015f3fa..840c805 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -47,6 +48,8 @@ public class UnknownInputChannel extends InputChannel {
 	/** Initial and maximum backoff (in ms) after failed partition requests. */
 	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
 
+	private final IOMetricGroup metrics;
+
 	public UnknownInputChannel(
 			SingleInputGate gate,
 			int channelIndex,
@@ -54,14 +57,16 @@ public class UnknownInputChannel extends InputChannel {
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
 			ConnectionManager connectionManager,
-			Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff) {
+			Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff,
+			IOMetricGroup metrics) {
 
-		super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff);
+		super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff, null);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
 		this.connectionManager = checkNotNull(connectionManager);
 		this.partitionRequestInitialAndMaxBackoff = checkNotNull(partitionRequestInitialAndMaxBackoff);
+		this.metrics = checkNotNull(metrics);
 	}
 
 	@Override
@@ -112,10 +117,10 @@ public class UnknownInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) {
-		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff);
+		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff, metrics);
 	}
 
 	public LocalInputChannel toLocalInputChannel() {
-		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff);
+		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff, metrics);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index e034dd6..406d430 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -63,13 +66,15 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 	@Override
 	public void initialize() throws Exception {
 		TaskConfig config = this.taskContext.getTaskConfig();
+
+		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
 		
 		TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
 		TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
 		TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
 		TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
-		MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
-		MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
+		MutableObjectIterator<IT1> input1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
+		MutableObjectIterator<IT2> input2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
 
 		TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = 
 				this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
@@ -164,8 +169,9 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
 		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		
 		while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 2589ca5..a28e27e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -23,10 +23,13 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -84,9 +87,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 		final double driverMemFraction = config.getRelativeMemoryDriver();
 		
 		final DriverStrategy ls = config.getDriverStrategy();
-		
-		final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
+
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
 		
 		// get serializers and comparators
 		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
@@ -147,8 +151,10 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 	
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+		
 		final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		final JoinTaskIterator<IT1, IT2, OT> outerJoinIterator = this.outerJoinIterator;
 		
 		while (this.running && outerJoinIterator.callWithNextKey(joinStub, collector)) ;

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 0c8dc34..f0673c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.util.Collector;
@@ -91,12 +94,15 @@ public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFuncti
 			LOG.debug("AllGroupCombine starting.");
 		}
 
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+
 		final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
 		TypeSerializer<IN> serializer = serializerFactory.getSerializer();
 
-		final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
+		final MutableObjectIterator<IN> in = new CountingMutableObjectIterator<>(this.taskContext.<IN>getInput(0), numRecordsIn);
 		final GroupCombineFunction<IN, OUT> reducer = this.taskContext.getStub();
-		final Collector<OUT> output = this.taskContext.getOutputCollector();
+		final Collector<OUT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			final ReusingMutableToRegularIteratorWrapper<IN> inIter = new ReusingMutableToRegularIteratorWrapper<IN>(in, serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index e8545e7..13d7222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -20,6 +20,9 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -104,14 +107,19 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 			LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code."));
 		}
 
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+
 		final ReduceFunction<T> stub = this.taskContext.getStub();
 		final MutableObjectIterator<T> input = this.input;
 		final TypeSerializer<T> serializer = this.serializer;
+		final Collector<T> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		T val1;
 		if ((val1 = input.next()) == null) {
 			return;
 		}
+		numRecordsIn.inc();
 
 		if (objectReuseEnabled) {
 			// We only need two objects. The first reference stores results and is
@@ -121,6 +129,7 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 			T value = val1;
 
 			while (running && (val2 = input.next(val2)) != null) {
+				numRecordsIn.inc();
 				value = stub.reduce(value, val2);
 
 				// we must never read into the object returned
@@ -132,14 +141,15 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 				}
 			}
 
-			this.taskContext.getOutputCollector().collect(value);
+			collector.collect(value);
 		} else {
 			T val2;
 			while (running && (val2 = input.next()) != null) {
+				numRecordsIn.inc();
 				val1 = stub.reduce(val1, val2);
 			}
 
-			this.taskContext.getOutputCollector().collect(val1);
+			collector.collect(val1);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index f38b988..36965ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -676,7 +676,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 			}
 
 			inputReaders[i].setReporter(reporter);
-			inputReaders[i].setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
 
 			currentReaderOffset += groupSize;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 665ab0e..43a913d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -20,7 +20,10 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -93,9 +96,11 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1,
 		if (config.getDriverStrategy() != DriverStrategy.CO_GROUP) {
 			throw new Exception("Unrecognized driver strategy for CoGoup driver: " + config.getDriverStrategy().name());
 		}
+
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
 		
-		final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
+		final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
 		
 		// get the key positions and types
 		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
@@ -144,8 +149,10 @@ public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1,
 	@Override
 	public void run() throws Exception
 	{
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+
 		final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator;
 		
 		while (this.running && coGroupIterator.next()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index c9d84b1..3e1d01f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -20,6 +20,9 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -194,9 +197,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 			LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
 					"First input is outer (blocking) side, second input is inner (spilling) side."));
 		}
-			
-		final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
+
+		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+
+		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
 		
 		final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
 		final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -213,7 +219,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 		
 
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 
 		if (objectReuseEnabled) {
@@ -259,9 +265,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 			LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: " +
 					"First input is inner (spilling) side, second input is outer (blocking) side."));
 		}
-		
-		final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
+
+		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+
+		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
 		
 		final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
 		final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -277,7 +286,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 		this.blockIter = blockVals;
 		
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			final T1 val1Reuse = serializer1.createInstance();
@@ -322,9 +331,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 			LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
 					"First input is outer side, second input is inner (spilling) side."));
 		}
-		
-		final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
+
+		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+
+		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
 		
 		final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
 		final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -335,7 +347,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 		this.spillIter = spillVals;
 		
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			final T1 val1Reuse = serializer1.createInstance();
@@ -372,8 +384,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 			LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: " +
 					"First input is inner (spilling) side, second input is outer side."));
 		}
-		final MutableObjectIterator<T1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<T2> in2 = this.taskContext.getInput(1);
+
+		final Counter numRecordsIn = taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = taskContext.getMetricGroup().counter("numRecordsOut");
+
+		final MutableObjectIterator<T1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<T1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<T2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<T2>getInput(1), numRecordsIn);
 		
 		final TypeSerializer<T1> serializer1 = this.taskContext.<T1>getInputSerializer(0).getSerializer();
 		final TypeSerializer<T2> serializer2 = this.taskContext.<T2>getInputSerializer(1).getSerializer();
@@ -384,7 +400,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>
 		this.spillIter = spillVals;
 
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			final T1 val1Reuse = serializer1.createInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 380edd4..b73c85a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.io.RichOutputFormat;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -104,8 +106,11 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		// --------------------------------------------------------------------
 		LOG.debug(getLogString("Starting data sink operator"));
 
+		RuntimeContext ctx = createRuntimeContext();
+		final Counter numRecordsIn = ctx.getMetricGroup().counter("numRecordsIn");
+		
 		if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
-			((RichOutputFormat) this.format).setRuntimeContext(createRuntimeContext());
+			((RichOutputFormat) this.format).setRuntimeContext(ctx);
 			LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
 		}
 
@@ -174,6 +179,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 				// work!
 				while (!this.taskCanceled && ((record = input.next(record)) != null)) {
+					numRecordsIn.inc();
 					format.writeRecord(record);
 				}
 			} else {
@@ -181,6 +187,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 				// work!
 				while (!this.taskCanceled && ((record = input.next()) != null)) {
+					numRecordsIn.inc();
 					format.writeRecord(record);
 				}
 			}
@@ -349,8 +356,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 		inputReader.setReporter(reporter);
 		
-		inputReader.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
-
 		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		@SuppressWarnings({ "rawtypes" })
 		final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 819b84f..c57f133 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -35,6 +37,7 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,8 +100,12 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		// --------------------------------------------------------------------
 		LOG.debug(getLogString("Starting data source operator"));
 
+		RuntimeContext ctx = createRuntimeContext();
+		Counter splitCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
+		Counter numRecordsOut = ctx.getMetricGroup().counter("numRecordsOut");
+
 		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
-			((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext());
+			((RichInputFormat) this.format).setRuntimeContext(ctx);
 			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
 			((RichInputFormat) this.format).openInputFormat();
 			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
@@ -135,7 +142,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 				LOG.debug(getLogString("Starting to read input from split " + split.toString()));
 				
 				try {
-					final Collector<OT> output = this.output;
+					final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
 
 					if (objectReuseEnabled) {
 						OT reuse = serializer.createInstance();
@@ -165,6 +172,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					// close. We close here such that a regular close throwing an exception marks a task as failed.
 					format.close();
 				}
+				splitCounter.inc();
 			} // end for all input splits
 
 			// close the collector. if it is a chaining task collector, it will close its chained tasks

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index c29923b..5b4a6ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -83,22 +85,26 @@ public class FlatMapDriver<IT, OT> implements Driver<FlatMapFunction<IT, OT>, OT
 
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
 		final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
-		final Collector<OT> output = this.taskContext.getOutputCollector();
+		final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
 
 
 			while (this.running && ((record = input.next(record)) != null)) {
+				numRecordsIn.inc();
 				function.flatMap(record, output);
 			}
 		} else {
 			IT record;
 
 			while (this.running && ((record = input.next()) != null)) {
+				numRecordsIn.inc();
 				function.flatMap(record, output);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index d6825ac..ccd88ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -89,9 +92,11 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT,
 		if (config.getDriverStrategy() != DriverStrategy.SORTED_GROUP_REDUCE) {
 			throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name());
 		}
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		
 		this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
 		this.comparator = this.taskContext.getDriverComparator(0);
-		this.input = this.taskContext.getInput(0);
+		this.input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
 
 		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
 		this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
@@ -106,10 +111,11 @@ public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT,
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code."));
 		}
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 
 		// cache references on the stack
 		final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
-		final Collector<OT> output = this.taskContext.getOutputCollector();
+		final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		
 		if (objectReuseEnabled) {
 			final ReusingKeyGroupedIterator<IT> iter = new ReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index f7ad8d1..efb59a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
@@ -34,6 +35,8 @@ import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -84,6 +87,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 	@Override
 	public void prepare() throws Exception{
 		final TaskConfig config = this.taskContext.getTaskConfig();
+
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
 		
 		// obtain task manager's memory manager and I/O manager
 		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
@@ -96,8 +101,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 		// test minimum memory requirements
 		final DriverStrategy ls = config.getDriverStrategy();
 		
-		final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
+		final MutableObjectIterator<IT1> in1 = new CountingMutableObjectIterator<>(this.taskContext.<IT1>getInput(0), numRecordsIn);
+		final MutableObjectIterator<IT2> in2 = new CountingMutableObjectIterator<>(this.taskContext.<IT2>getInput(1), numRecordsIn);
 
 		// get the key positions and types
 		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
@@ -209,8 +214,9 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		final JoinTaskIterator<IT1, IT2, OT> joinIterator = this.joinIterator;
 		
 		while (this.running && joinIterator.callWithNextKey(joinStub, collector));

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index eefe8e4..65f9061 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -78,15 +80,18 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
 
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
 		final MapFunction<IT, OT> function = this.taskContext.getStub();
-		final Collector<OT> output = this.taskContext.getOutputCollector();
+		final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
 	
 			while (this.running && ((record = input.next(record)) != null)) {
+				numRecordsIn.inc();
 				output.collect(function.map(record));
 			}
 		}
@@ -94,6 +99,7 @@ public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
 			IT record = null;
 			
 			while (this.running && ((record = input.next()) != null)) {
+				numRecordsIn.inc();
 				output.collect(function.map(record));
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index 8f245f0..3496e14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.util.Collector;
@@ -83,10 +86,12 @@ public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<I
 
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		// cache references on the stack
-		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
+		final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
 		final MapPartitionFunction<IT, OT> function = this.taskContext.getStub();
-		final Collector<OT> output = this.taskContext.getOutputCollector();
+		final Collector<OT> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			final ReusingMutableToRegularIteratorWrapper<IT> inIter = new ReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
index 9b08fad..802227a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpChainedDriver.java
@@ -61,6 +61,7 @@ public class NoOpChainedDriver<IT> extends ChainedDriver<IT, IT> {
 	@Override
 	public void collect(IT record) {
 		try {
+			this.numRecordsIn.inc();
 			this.outputCollector.collect(record);
 		} catch (Exception ex) {
 			throw new ExceptionInChainedStubException(this.taskName, ex);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index 073a837..dd64b76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -75,18 +77,22 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
 	@Override
 	public void run() throws Exception {
 		// cache references on the stack
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		final MutableObjectIterator<T> input = this.taskContext.getInput(0);
-		final Collector<T> output = this.taskContext.getOutputCollector();
+		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			T record = this.taskContext.<T>getInputSerializer(0).getSerializer().createInstance();
 
 			while (this.running && ((record = input.next(record)) != null)) {
+				numRecordsIn.inc();
 				output.collect(record);
 			}
 		} else {
 			T record;
 			while (this.running && ((record = input.next()) != null)) {
+				numRecordsIn.inc();
 				output.collect(record);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 1ceeaf0..e1ce39e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -104,13 +106,15 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
 		if (this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_PARTIAL_REDUCE) {
 			throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
 		}
+
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
 		this.comparator = this.taskContext.getDriverComparator(0);
 		this.serializer = serializerFactory.getSerializer();
 		this.reducer = this.taskContext.getStub();
-		this.output = this.taskContext.getOutputCollector();
+		this.output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		MemoryManager memManager = this.taskContext.getMemoryManager();
 		final int numMemoryPages = memManager.computeNumberOfPages(
@@ -140,6 +144,8 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
 			LOG.debug("Combiner starting.");
 		}
 		
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		
 		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
 		final TypeSerializer<T> serializer = this.serializer;
 		
@@ -147,6 +153,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
 			T value = serializer.createInstance();
 		
 			while (running && (value = in.next(value)) != null) {
+				numRecordsIn.inc();
 				
 				// try writing to the sorter first
 				if (this.sorter.write(value)) {
@@ -166,6 +173,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
 		else {
 			T value;
 			while (running && (value = in.next()) != null) {
+				numRecordsIn.inc();
 
 				// try writing to the sorter first
 				if (this.sorter.write(value)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 3b7af6e..eb4f2f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -20,6 +20,8 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -106,6 +108,9 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 			LOG.debug(this.taskContext.formatLogString("Reducer preprocessing done. Running Reducer code."));
 		}
 
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
+
 		// cache references on the stack
 		final MutableObjectIterator<T> input = this.input;
 		final TypeSerializer<T> serializer = this.serializer;
@@ -113,7 +118,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 		
 		final ReduceFunction<T> function = this.taskContext.getStub();
 		
-		final Collector<T> output = this.taskContext.getOutputCollector();
+		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 
 		if (objectReuseEnabled) {
 			// We only need two objects. The first reference stores results and is
@@ -128,10 +133,12 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 
 			// iterate over key groups
 			while (this.running && value != null) {
+				numRecordsIn.inc();
 				comparator.setReference(value);
 
 				// iterate within a key group
 				while ((reuse2 = input.next(reuse2)) != null) {
+					numRecordsIn.inc();
 					if (comparator.equalToReference(reuse2)) {
 						// same group, reduce
 						value = function.reduce(value, reuse2);
@@ -163,11 +170,13 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 
 			// iterate over key groups
 			while (this.running && value != null) {
+				numRecordsIn.inc();
 				comparator.setReference(value);
 				T res = value;
 
 				// iterate within a key group
 				while ((value = input.next()) != null) {
+					numRecordsIn.inc();
 					if (comparator.equalToReference(value)) {
 						// same group, reduce
 						res = function.reduce(res, value);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index 4791761..3d52925 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -58,18 +60,22 @@ public class UnionWithTempOperator<T> implements Driver<Function, T> {
 
 	@Override
 	public void run() throws Exception {
+		final Counter numRecordsIn = this.taskContext.getMetricGroup().counter("numRecordsIn");
+		final Counter numRecordsOut = this.taskContext.getMetricGroup().counter("numRecordsOut");
 		
-		final Collector<T> output = this.taskContext.getOutputCollector();
+		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
 		T reuse = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
 		T record;
 		
 		final MutableObjectIterator<T> input = this.taskContext.getInput(STREAMED_INPUT);
 		while (this.running && ((record = input.next(reuse)) != null)) {
+			numRecordsIn.inc();
 			output.collect(record);
 		}
 		
 		final MutableObjectIterator<T> cache = this.taskContext.getInput(CACHED_INPUT);
 		while (this.running && ((record = cache.next(reuse)) != null)) {
+			numRecordsIn.inc();
 			output.collect(record);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
index 46ee41b..1e3482f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -86,6 +86,7 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
 	// --------------------------------------------------------------------------------------------
 	@Override
 	public void collect(IT record) {
+		numRecordsIn.inc();
 		try {
 			if (base == null) {
 				base = objectReuseEnabled ? record : serializer.copy(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/c78b3c49/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 407716f..2560135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -22,12 +22,14 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
 import org.apache.flink.util.Collector;
 
 import java.util.Map;
@@ -53,6 +55,10 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 	protected boolean objectReuseEnabled = false;
 	
 	protected MetricGroup metrics;
+	
+	protected Counter numRecordsIn;
+	
+	protected Counter numRecordsOut;
 
 	
 	public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
@@ -61,9 +67,11 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 	{
 		this.config = config;
 		this.taskName = taskName;
-		this.outputCollector = outputCollector;
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
+		this.numRecordsIn = this.metrics.counter("numRecordsIn");
+		this.numRecordsOut = this.metrics.counter("numRecordsOut");
+		this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
 
 		Environment env = parent.getEnvironment();
 
@@ -103,7 +111,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 
 	@SuppressWarnings("unchecked")
 	public void setOutputCollector(Collector<?> outputCollector) {
-		this.outputCollector = (Collector<OT>) outputCollector;
+		this.outputCollector = new CountingCollector<>((Collector<OT>) outputCollector, numRecordsOut);
 	}
 
 	public Collector<OT> getOutputCollector() {