You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/22 09:13:51 UTC
flink git commit: [FLINK-4875] [metrics] Use correct operator name
Repository: flink
Updated Branches:
refs/heads/master 227cdc829 -> b0753f193
[FLINK-4875] [metrics] Use correct operator name
This closes #2676.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0753f19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0753f19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0753f19
Branch: refs/heads/master
Commit: b0753f193cb7c8448547e254326911166d7b96a2
Parents: 227cdc8
Author: zentol <ch...@apache.org>
Authored: Thu Oct 20 15:53:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Oct 22 11:13:28 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/streaming/api/graph/StreamConfig.java | 9 +++++++++
.../streaming/api/graph/StreamingJobGraphGenerator.java | 2 ++
.../streaming/api/operators/AbstractStreamOperator.java | 3 +--
3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0dd1b37..ffe8456 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,6 +68,7 @@ public class StreamConfig implements Serializable {
private static final String EDGES_IN_ORDER = "edgesInOrder";
private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges";
+ private static final String OPERATOR_NAME = "operatorName";
private static final String CHECKPOINTING_ENABLED = "checkpointing";
private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -390,6 +391,14 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate configuration.", e);
}
}
+
+ public void setOperatorName(String name) {
+ this.config.setString(OPERATOR_NAME,name);
+ }
+
+ public String getOperatorName() {
+ return this.config.getString(OPERATOR_NAME, null);
+ }
public void setChainIndex(int index) {
this.config.setInteger(CHAIN_INDEX, index);
http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 824e375..1d99cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -211,6 +211,7 @@ public class StreamingJobGraphGenerator {
config.setChainStart();
config.setChainIndex(0);
+ config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
@@ -228,6 +229,7 @@ public class StreamingJobGraphGenerator {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
+ config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 82ce493..f2da9da 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -157,9 +157,8 @@ public abstract class AbstractStreamOperator<OUT>
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
this.container = containingTask;
this.config = config;
- String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
- this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
+ this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);