You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 10:06:37 UTC

[04/16] flink git commit: [FLINK-6898] [metrics] Limit size of operator component in metric name

[FLINK-6898] [metrics] Limit size of operator component in metric name

This closes #4109.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edb79b0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edb79b0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edb79b0a

Branch: refs/heads/master
Commit: edb79b0a8bb2b33e1a0470b99b11274ac0e9c673
Parents: 0858076
Author: zentol <ch...@apache.org>
Authored: Mon Jun 12 15:36:25 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../metrics/groups/AbstractMetricGroup.java      |  2 +-
 .../runtime/metrics/groups/TaskMetricGroup.java  |  6 ++++++
 .../metrics/groups/TaskMetricGroupTest.java      | 19 +++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index c67c5ea..ab59977 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -63,7 +63,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup {
 
-	private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index cb7aaa0..338fc20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,6 +42,8 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
+	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
+
 	private final TaskIOMetricGroup ioMetrics;
 
 	/** The execution Id uniquely identifying the executed task represented by this metrics group. */
@@ -131,6 +133,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	// ------------------------------------------------------------------------
 
 	public OperatorMetricGroup addOperator(String name) {
+		if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
+			LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
+			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
 
 		synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 22b0a1a..47fc98b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -25,10 +25,12 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -150,6 +152,23 @@ public class TaskMetricGroupTest extends TestLogger {
 		registry.shutdown();
 	}
 
+	@Test
+	public void testOperatorNameTruncation() {
+		Configuration cfg = new Configuration();
+		cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
+		TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0);
+
+		String originalName = new String(new char[100]).replace("\0", "-");
+		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+
+		String storedName = operatorMetricGroup.getScopeComponents()[0];
+		Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
+		Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
+	}
+
 	private static class CountingMetricRegistry extends MetricRegistry {
 
 		private int counter = 0;