You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/31 15:08:30 UTC

[2/5] flink git commit: [FLINK-4733] Reuse operator IO metrics for task

[FLINK-4733] Reuse operator IO metrics for task


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f4f6f92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f4f6f92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f4f6f92

Branch: refs/heads/master
Commit: 1f4f6f928cb71fcbcba8d5aca731e738352d1175
Parents: 99f1dc3
Author: zentol <ch...@apache.org>
Authored: Mon Oct 31 14:08:26 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Oct 31 15:12:02 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/IOMetrics.java | 99 ++++++++++++++++++++
 .../metrics/groups/OperatorIOMetricGroup.java   | 17 ++++
 .../metrics/groups/TaskIOMetricGroup.java       | 67 ++++++++++++-
 .../flink/runtime/operators/BatchTask.java      |  8 ++
 .../flink/runtime/operators/DataSinkTask.java   |  2 +
 .../flink/runtime/operators/DataSourceTask.java |  4 +
 .../operators/chaining/ChainedDriver.java       |  4 +
 .../metrics/groups/TaskIOMetricGroupTest.java   | 61 ++++++++++++
 .../flink/streaming/api/graph/StreamConfig.java |  9 ++
 .../api/graph/StreamingJobGraphGenerator.java   |  3 +
 .../api/operators/AbstractStreamOperator.java   |  6 ++
 11 files changed, 278 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
new file mode 100644
index 0000000..15c54b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.Meter;
+
+import java.io.Serializable;
+
+/**
+ * An instance of this class represents a snapshot of the io-related metrics of a single task.
+ */
+public class IOMetrics implements Serializable {
+	private static final long serialVersionUID = -7208093607556457183L;
+	private final long numRecordsIn;
+	private final long numRecordsOut;
+
+	private final double numRecordsInPerSecond;
+	private final double numRecordsOutPerSecond;
+
+	private final long numBytesInLocal;
+	private final long numBytesInRemote;
+	private final long numBytesOut;
+
+	private final double numBytesInLocalPerSecond;
+	private final double numBytesInRemotePerSecond;
+	private final double numBytesOutPerSecond;
+
+	public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) {
+		this.numRecordsIn = recordsIn.getCount();
+		this.numRecordsInPerSecond = recordsIn.getRate();
+		this.numRecordsOut = recordsOut.getCount();
+		this.numRecordsOutPerSecond = recordsOut.getRate();
+		this.numBytesInLocal = bytesLocalIn.getCount();
+		this.numBytesInLocalPerSecond = bytesLocalIn.getRate();
+		this.numBytesInRemote = bytesRemoteIn.getCount();
+		this.numBytesInRemotePerSecond = bytesRemoteIn.getRate();
+		this.numBytesOut = bytesOut.getCount();
+		this.numBytesOutPerSecond = bytesOut.getRate();
+	}
+
+	public long getNumRecordsIn() {
+		return numRecordsIn;
+	}
+
+	public long getNumRecordsOut() {
+		return numRecordsOut;
+	}
+
+	public long getNumBytesInLocal() {
+		return numBytesInLocal;
+	}
+
+	public long getNumBytesInRemote() {
+		return numBytesInRemote;
+	}
+
+	public long getNumBytesInTotal() {
+		return numBytesInLocal + numBytesInRemote;
+	}
+
+	public long getNumBytesOut() {
+		return numBytesOut;
+	}
+
+	public double getNumRecordsInPerSecond() {
+		return numRecordsInPerSecond;
+	}
+
+	public double getNumRecordsOutPerSecond() {
+		return numRecordsOutPerSecond;
+	}
+
+	public double getNumBytesInLocalPerSecond() {
+		return numBytesInLocalPerSecond;
+	}
+
+	public double getNumBytesInRemotePerSecond() {
+		return numBytesInRemotePerSecond;
+	}
+
+	public double getNumBytesOutPerSecond() {
+		return numBytesOutPerSecond;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 32611fd..2e321fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -56,4 +56,21 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
 	public Meter getNumRecordsOutRate() {
 		return numRecordsOutRate;
 	}
+
+	/**
+	 * Causes the containing task to use this operators input record counter.
+	 */
+	public void reuseInputMetricsForTask() {
+		TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
+		taskIO.reuseRecordsInputCounter(this.numRecordsIn);
+		
+	}
+
+	/**
+	 * Causes the containing task to use this operators output record counter.
+	 */
+	public void reuseOutputMetricsForTask() {
+		TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
+		taskIO.reuseRecordsOutputCounter(this.numRecordsOut);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index b2884ec..4e32563 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -23,9 +23,14 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -36,10 +41,14 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	private final Counter numBytesOut;
 	private final Counter numBytesInLocal;
 	private final Counter numBytesInRemote;
+	private final SumCounter numRecordsIn;
+	private final SumCounter numRecordsOut;
 
 	private final Meter numBytesInRateLocal;
 	private final Meter numBytesInRateRemote;
 	private final Meter numBytesOutRate;
+	private final Meter numRecordsInRate;
+	private final Meter numRecordsOutRate;
 
 	private final MetricGroup buffers;
 
@@ -52,10 +61,21 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 		this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
 		this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
 		this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
+		this.numRecordsIn = counter("numRecordsIn", new SumCounter());
+		this.numRecordsOut = counter("numRecordsOut", new SumCounter());
+		this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
+		this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
 
 		this.buffers = addGroup("buffers");
 	}
 
+	public IOMetrics createSnapshot() {
+		return new IOMetrics(numRecordsInRate, numRecordsOutRate, numBytesInRateLocal, numBytesInRateRemote, numBytesOutRate);
+	}
+
+	// ============================================================================================
+	// Getters
+	// ============================================================================================
 	public Counter getNumBytesOutCounter() {
 		return numBytesOut;
 	}
@@ -68,11 +88,19 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 		return numBytesInRemote;
 	}
 
-	public Meter getNumBytesInRateLocalMeter() {
+	public Counter getNumRecordsInCounter() {
+		return numRecordsIn;
+	}
+
+	public Counter getNumRecordsOutCounter() {
+		return numRecordsOut;
+	}
+
+	public Meter getNumBytesInLocalRateMeter() {
 		return numBytesInRateLocal;
 	}
 
-	public Meter getNumBytesInRateRemoteMeter() {
+	public Meter getNumBytesInRemoteRateMeter() {
 		return numBytesInRateRemote;
 	}
 
@@ -134,6 +162,41 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 		}
 	}
 
+	// ============================================================================================
+	// Metric Reuse
+	// ============================================================================================
+	public void reuseRecordsInputCounter(Counter numRecordsInCounter) {
+		this.numRecordsIn.addCounter(numRecordsInCounter);
+	}
+
+	public void reuseRecordsOutputCounter(Counter numRecordsOutCounter) {
+		this.numRecordsOut.addCounter(numRecordsOutCounter);
+	}
+
+	/**
+	 * A {@link SimpleCounter} that can contain other {@link Counter}s. A call to {@link SumCounter#getCount()} returns
+	 * the sum of this counters and all contained counters.
+	 */
+	private static class SumCounter extends SimpleCounter {
+		private final List<Counter> internalCounters = new ArrayList<>();
+
+		SumCounter() {
+		}
+
+		public void addCounter(Counter toAdd) {
+			internalCounters.add(toAdd);
+		}
+
+		@Override
+		public long getCount() {
+			long sum = super.getCount();
+			for (Counter counter : internalCounters) {
+				sum += counter.getCount();
+			}
+			return sum;
+		}
+	}
+
 	/**
 	 * Input buffer pool usage gauge of a task
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 354dbac..e896639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -243,6 +243,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 		String headName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
 		this.metrics = getEnvironment().getMetricGroup()
 			.addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
+		this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
+		if (config.getNumberOfChainedStubs() == 0) {
+			this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
+		}
 
 		// initialize the readers.
 		// this does not yet trigger any stream consuming or processing.
@@ -1306,6 +1310,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 				ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap);
 				chainedTasksTarget.add(0, ct);
 
+				if (i == numChained - 1) {
+					ct.getIOMetrics().reuseOutputMetricsForTask();
+				}
+
 				previous = ct;
 			}
 			// the collector of the first in the chain is the collector for the task

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 4626b69..eb7999c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -109,6 +109,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 		RuntimeContext ctx = createRuntimeContext();
 		final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+		((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
+		((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
 		
 		if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
 			((RichOutputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 4dc3ef5..e89559d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -103,7 +103,11 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 
 		RuntimeContext ctx = createRuntimeContext();
 		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
+		((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
 		Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
+		if (this.config.getNumberOfChainedStubs() == 0) {
+			((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
+		}
 
 		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
 			((RichInputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index cf62dfa..442a53c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
@@ -104,6 +105,9 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 	@Override
 	public abstract void collect(IT record);
 
+	public OperatorIOMetricGroup getIOMetrics() {
+		return this.metrics.getIOMetricGroup();
+	}
 	
 	protected RuntimeContext getUdfRuntimeContext() {
 		return this.udfContext;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
new file mode 100644
index 0000000..564a518
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TaskIOMetricGroupTest {
+	@Test
+	public void testTaskIOMetricGroup() {
+		TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
+		TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+
+		// test counter forwarding
+		assertNotNull(taskIO.getNumRecordsInCounter());
+		assertNotNull(taskIO.getNumRecordsOutCounter());
+
+		Counter c1 = new SimpleCounter();
+		c1.inc(32L);
+		Counter c2 = new SimpleCounter();
+		c2.inc(64L);
+
+		taskIO.reuseRecordsInputCounter(c1);
+		taskIO.reuseRecordsOutputCounter(c2);
+		assertEquals(32L, taskIO.getNumRecordsInCounter().getCount());
+		assertEquals(64L, taskIO.getNumRecordsOutCounter().getCount());
+
+		// test IOMetrics instantiation
+		taskIO.getNumBytesInLocalCounter().inc(100L);
+		taskIO.getNumBytesInRemoteCounter().inc(150L);
+		taskIO.getNumBytesOutCounter().inc(250L);
+		
+		IOMetrics io = taskIO.createSnapshot();
+		assertEquals(32L, io.getNumRecordsIn());
+		assertEquals(64L, io.getNumRecordsOut());
+		assertEquals(100L, io.getNumBytesInLocal());
+		assertEquals(150L, io.getNumBytesInRemote());
+		assertEquals(250L, io.getNumBytesOut());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2d38fb9..dd4c55c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -69,6 +69,7 @@ public class StreamConfig implements Serializable {
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
 	private static final String OPERATOR_NAME = "operatorName";
+	private static final String CHAIN_END = "chainEnd";
 
 	private static final String CHECKPOINTING_ENABLED = "checkpointing";
 	private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -478,6 +479,14 @@ public class StreamConfig implements Serializable {
 		return config.getBoolean(IS_CHAINED_VERTEX, false);
 	}
 
+	public void setChainEnd() {
+		config.setBoolean(CHAIN_END, true);
+	}
+
+	public boolean isChainEnd() {
+		return config.getBoolean(CHAIN_END, false);
+	}
+
 	@Override
 	public String toString() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 87fd7eb..8f9da8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -231,6 +231,9 @@ public class StreamingJobGraphGenerator {
 				config.setChainIndex(chainIndex);
 				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				chainedConfigs.get(startNodeId).put(currentNodeId, config);
+				if (chainableOutputs.isEmpty()) {
+					config.setChainEnd();
+				}
 			}
 
 			return transitiveOutEdges;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 5f0dd85..a659866 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -163,6 +163,12 @@ public abstract class AbstractStreamOperator<OUT>
 		
 		this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
 		this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
+		if (config.isChainStart()) {
+			((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
+		}
+		if (config.isChainEnd()) {
+			((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
+		}
 		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
 		int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
 		if (historySize <= 0) {