You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/31 15:08:30 UTC
[2/5] flink git commit: [FLINK-4733] Reuse operator IO metrics for
task
[FLINK-4733] Reuse operator IO metrics for task
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f4f6f92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f4f6f92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f4f6f92
Branch: refs/heads/master
Commit: 1f4f6f928cb71fcbcba8d5aca731e738352d1175
Parents: 99f1dc3
Author: zentol <ch...@apache.org>
Authored: Mon Oct 31 14:08:26 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Oct 31 15:12:02 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/IOMetrics.java | 99 ++++++++++++++++++++
.../metrics/groups/OperatorIOMetricGroup.java | 17 ++++
.../metrics/groups/TaskIOMetricGroup.java | 67 ++++++++++++-
.../flink/runtime/operators/BatchTask.java | 8 ++
.../flink/runtime/operators/DataSinkTask.java | 2 +
.../flink/runtime/operators/DataSourceTask.java | 4 +
.../operators/chaining/ChainedDriver.java | 4 +
.../metrics/groups/TaskIOMetricGroupTest.java | 61 ++++++++++++
.../flink/streaming/api/graph/StreamConfig.java | 9 ++
.../api/graph/StreamingJobGraphGenerator.java | 3 +
.../api/operators/AbstractStreamOperator.java | 6 ++
11 files changed, 278 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
new file mode 100644
index 0000000..15c54b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.Meter;
+
+import java.io.Serializable;
+
+/**
+ * An instance of this class represents a snapshot of the io-related metrics of a single task.
+ */
+public class IOMetrics implements Serializable {
+ private static final long serialVersionUID = -7208093607556457183L;
+ private final long numRecordsIn;
+ private final long numRecordsOut;
+
+ private final double numRecordsInPerSecond;
+ private final double numRecordsOutPerSecond;
+
+ private final long numBytesInLocal;
+ private final long numBytesInRemote;
+ private final long numBytesOut;
+
+ private final double numBytesInLocalPerSecond;
+ private final double numBytesInRemotePerSecond;
+ private final double numBytesOutPerSecond;
+
+ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) {
+ this.numRecordsIn = recordsIn.getCount();
+ this.numRecordsInPerSecond = recordsIn.getRate();
+ this.numRecordsOut = recordsOut.getCount();
+ this.numRecordsOutPerSecond = recordsOut.getRate();
+ this.numBytesInLocal = bytesLocalIn.getCount();
+ this.numBytesInLocalPerSecond = bytesLocalIn.getRate();
+ this.numBytesInRemote = bytesRemoteIn.getCount();
+ this.numBytesInRemotePerSecond = bytesRemoteIn.getRate();
+ this.numBytesOut = bytesOut.getCount();
+ this.numBytesOutPerSecond = bytesOut.getRate();
+ }
+
+ public long getNumRecordsIn() {
+ return numRecordsIn;
+ }
+
+ public long getNumRecordsOut() {
+ return numRecordsOut;
+ }
+
+ public long getNumBytesInLocal() {
+ return numBytesInLocal;
+ }
+
+ public long getNumBytesInRemote() {
+ return numBytesInRemote;
+ }
+
+ public long getNumBytesInTotal() {
+ return numBytesInLocal + numBytesInRemote;
+ }
+
+ public long getNumBytesOut() {
+ return numBytesOut;
+ }
+
+ public double getNumRecordsInPerSecond() {
+ return numRecordsInPerSecond;
+ }
+
+ public double getNumRecordsOutPerSecond() {
+ return numRecordsOutPerSecond;
+ }
+
+ public double getNumBytesInLocalPerSecond() {
+ return numBytesInLocalPerSecond;
+ }
+
+ public double getNumBytesInRemotePerSecond() {
+ return numBytesInRemotePerSecond;
+ }
+
+ public double getNumBytesOutPerSecond() {
+ return numBytesOutPerSecond;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 32611fd..2e321fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -56,4 +56,21 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
public Meter getNumRecordsOutRate() {
return numRecordsOutRate;
}
+
+ /**
+ * Causes the containing task to use this operators input record counter.
+ */
+ public void reuseInputMetricsForTask() {
+ TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
+ taskIO.reuseRecordsInputCounter(this.numRecordsIn);
+
+ }
+
+ /**
+ * Causes the containing task to use this operators output record counter.
+ */
+ public void reuseOutputMetricsForTask() {
+ TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup();
+ taskIO.reuseRecordsOutputCounter(this.numRecordsOut);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index b2884ec..4e32563 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -23,9 +23,14 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+
+import java.util.ArrayList;
+import java.util.List;
/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -36,10 +41,14 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Counter numBytesOut;
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;
+ private final SumCounter numRecordsIn;
+ private final SumCounter numRecordsOut;
private final Meter numBytesInRateLocal;
private final Meter numBytesInRateRemote;
private final Meter numBytesOutRate;
+ private final Meter numRecordsInRate;
+ private final Meter numRecordsOutRate;
private final MetricGroup buffers;
@@ -52,10 +61,21 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
+ this.numRecordsIn = counter("numRecordsIn", new SumCounter());
+ this.numRecordsOut = counter("numRecordsOut", new SumCounter());
+ this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
+ this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
this.buffers = addGroup("buffers");
}
+ public IOMetrics createSnapshot() {
+ return new IOMetrics(numRecordsInRate, numRecordsOutRate, numBytesInRateLocal, numBytesInRateRemote, numBytesOutRate);
+ }
+
+ // ============================================================================================
+ // Getters
+ // ============================================================================================
public Counter getNumBytesOutCounter() {
return numBytesOut;
}
@@ -68,11 +88,19 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
return numBytesInRemote;
}
- public Meter getNumBytesInRateLocalMeter() {
+ public Counter getNumRecordsInCounter() {
+ return numRecordsIn;
+ }
+
+ public Counter getNumRecordsOutCounter() {
+ return numRecordsOut;
+ }
+
+ public Meter getNumBytesInLocalRateMeter() {
return numBytesInRateLocal;
}
- public Meter getNumBytesInRateRemoteMeter() {
+ public Meter getNumBytesInRemoteRateMeter() {
return numBytesInRateRemote;
}
@@ -134,6 +162,41 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
}
}
+ // ============================================================================================
+ // Metric Reuse
+ // ============================================================================================
+ public void reuseRecordsInputCounter(Counter numRecordsInCounter) {
+ this.numRecordsIn.addCounter(numRecordsInCounter);
+ }
+
+ public void reuseRecordsOutputCounter(Counter numRecordsOutCounter) {
+ this.numRecordsOut.addCounter(numRecordsOutCounter);
+ }
+
+ /**
+ * A {@link SimpleCounter} that can contain other {@link Counter}s. A call to {@link SumCounter#getCount()} returns
+ * the sum of this counters and all contained counters.
+ */
+ private static class SumCounter extends SimpleCounter {
+ private final List<Counter> internalCounters = new ArrayList<>();
+
+ SumCounter() {
+ }
+
+ public void addCounter(Counter toAdd) {
+ internalCounters.add(toAdd);
+ }
+
+ @Override
+ public long getCount() {
+ long sum = super.getCount();
+ for (Counter counter : internalCounters) {
+ sum += counter.getCount();
+ }
+ return sum;
+ }
+ }
+
/**
* Input buffer pool usage gauge of a task
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 354dbac..e896639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -243,6 +243,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
String headName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
this.metrics = getEnvironment().getMetricGroup()
.addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
+ this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
+ if (config.getNumberOfChainedStubs() == 0) {
+ this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
+ }
// initialize the readers.
// this does not yet trigger any stream consuming or processing.
@@ -1306,6 +1310,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap);
chainedTasksTarget.add(0, ct);
+ if (i == numChained - 1) {
+ ct.getIOMetrics().reuseOutputMetricsForTask();
+ }
+
previous = ct;
}
// the collector of the first in the chain is the collector for the task
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 4626b69..eb7999c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -109,6 +109,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
RuntimeContext ctx = createRuntimeContext();
final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
+ ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
((RichOutputFormat) this.format).setRuntimeContext(ctx);
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 4dc3ef5..e89559d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -103,7 +103,11 @@ public class DataSourceTask<OT> extends AbstractInvokable {
RuntimeContext ctx = createRuntimeContext();
Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
+ ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
+ if (this.config.getNumberOfChainedStubs() == 0) {
+ ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
+ }
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(ctx);
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index cf62dfa..442a53c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
@@ -104,6 +105,9 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
@Override
public abstract void collect(IT record);
+ public OperatorIOMetricGroup getIOMetrics() {
+ return this.metrics.getIOMetricGroup();
+ }
protected RuntimeContext getUdfRuntimeContext() {
return this.udfContext;
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
new file mode 100644
index 0000000..564a518
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TaskIOMetricGroupTest {
+ @Test
+ public void testTaskIOMetricGroup() {
+ TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
+ TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+
+ // test counter forwarding
+ assertNotNull(taskIO.getNumRecordsInCounter());
+ assertNotNull(taskIO.getNumRecordsOutCounter());
+
+ Counter c1 = new SimpleCounter();
+ c1.inc(32L);
+ Counter c2 = new SimpleCounter();
+ c2.inc(64L);
+
+ taskIO.reuseRecordsInputCounter(c1);
+ taskIO.reuseRecordsOutputCounter(c2);
+ assertEquals(32L, taskIO.getNumRecordsInCounter().getCount());
+ assertEquals(64L, taskIO.getNumRecordsOutCounter().getCount());
+
+ // test IOMetrics instantiation
+ taskIO.getNumBytesInLocalCounter().inc(100L);
+ taskIO.getNumBytesInRemoteCounter().inc(150L);
+ taskIO.getNumBytesOutCounter().inc(250L);
+
+ IOMetrics io = taskIO.createSnapshot();
+ assertEquals(32L, io.getNumRecordsIn());
+ assertEquals(64L, io.getNumRecordsOut());
+ assertEquals(100L, io.getNumBytesInLocal());
+ assertEquals(150L, io.getNumBytesInRemote());
+ assertEquals(250L, io.getNumBytesOut());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2d38fb9..dd4c55c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -69,6 +69,7 @@ public class StreamConfig implements Serializable {
private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges";
private static final String OPERATOR_NAME = "operatorName";
+ private static final String CHAIN_END = "chainEnd";
private static final String CHECKPOINTING_ENABLED = "checkpointing";
private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -478,6 +479,14 @@ public class StreamConfig implements Serializable {
return config.getBoolean(IS_CHAINED_VERTEX, false);
}
+ public void setChainEnd() {
+ config.setBoolean(CHAIN_END, true);
+ }
+
+ public boolean isChainEnd() {
+ return config.getBoolean(CHAIN_END, false);
+ }
+
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 87fd7eb..8f9da8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -231,6 +231,9 @@ public class StreamingJobGraphGenerator {
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
+ if (chainableOutputs.isEmpty()) {
+ config.setChainEnd();
+ }
}
return transitiveOutEdges;
http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 5f0dd85..a659866 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -163,6 +163,12 @@ public abstract class AbstractStreamOperator<OUT>
this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
+ if (config.isChainStart()) {
+ ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
+ }
+ if (config.isChainEnd()) {
+ ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
+ }
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
if (historySize <= 0) {