You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/22 09:13:51 UTC

flink git commit: [FLINK-4875] [metrics] Use correct operator name

Repository: flink
Updated Branches:
  refs/heads/master 227cdc829 -> b0753f193


[FLINK-4875] [metrics] Use correct operator name

This closes #2676.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0753f19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0753f19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0753f19

Branch: refs/heads/master
Commit: b0753f193cb7c8448547e254326911166d7b96a2
Parents: 227cdc8
Author: zentol <ch...@apache.org>
Authored: Thu Oct 20 15:53:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Oct 22 11:13:28 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/graph/StreamConfig.java  | 9 +++++++++
 .../streaming/api/graph/StreamingJobGraphGenerator.java     | 2 ++
 .../streaming/api/operators/AbstractStreamOperator.java     | 3 +--
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0dd1b37..ffe8456 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,6 +68,7 @@ public class StreamConfig implements Serializable {
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
+	private static final String OPERATOR_NAME = "operatorName";
 
 	private static final String CHECKPOINTING_ENABLED = "checkpointing";
 	private static final String CHECKPOINT_MODE = "checkpointMode";
@@ -390,6 +391,14 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate configuration.", e);
 		}
 	}
+	
+	public void setOperatorName(String name) {
+		this.config.setString(OPERATOR_NAME,name);
+	}
+	
+	public String getOperatorName() {
+		return this.config.getString(OPERATOR_NAME, null);
+	}
 
 	public void setChainIndex(int index) {
 		this.config.setInteger(CHAIN_INDEX, index);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 824e375..1d99cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -211,6 +211,7 @@ public class StreamingJobGraphGenerator {
 
 				config.setChainStart();
 				config.setChainIndex(0);
+				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				config.setOutEdgesInOrder(transitiveOutEdges);
 				config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
 
@@ -228,6 +229,7 @@ public class StreamingJobGraphGenerator {
 					chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
 				}
 				config.setChainIndex(chainIndex);
+				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				chainedConfigs.get(startNodeId).put(currentNodeId, config);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b0753f19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 82ce493..f2da9da 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -157,9 +157,8 @@ public abstract class AbstractStreamOperator<OUT>
 	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
 		this.container = containingTask;
 		this.config = config;
-		String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
 		
-		this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
+		this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
 		this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
 		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
 		int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);