You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 10:06:37 UTC
[04/16] flink git commit: [FLINK-6898] [metrics] Limit size of
operator component in metric name
[FLINK-6898] [metrics] Limit size of operator component in metric name
This closes #4109.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edb79b0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edb79b0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edb79b0a
Branch: refs/heads/master
Commit: edb79b0a8bb2b33e1a0470b99b11274ac0e9c673
Parents: 0858076
Author: zentol <ch...@apache.org>
Authored: Mon Jun 12 15:36:25 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200
----------------------------------------------------------------------
.../metrics/groups/AbstractMetricGroup.java | 2 +-
.../runtime/metrics/groups/TaskMetricGroup.java | 6 ++++++
.../metrics/groups/TaskMetricGroupTest.java | 19 +++++++++++++++++++
3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index c67c5ea..ab59977 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -63,7 +63,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup {
- private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index cb7aaa0..338fc20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,6 +42,8 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
+ static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
+
private final TaskIOMetricGroup ioMetrics;
/** The execution Id uniquely identifying the executed task represented by this metrics group. */
@@ -131,6 +133,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
// ------------------------------------------------------------------------
public OperatorMetricGroup addOperator(String name) {
+ if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
+ LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
+ name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+ }
OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 22b0a1a..47fc98b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -25,10 +25,12 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
@@ -150,6 +152,23 @@ public class TaskMetricGroupTest extends TestLogger {
registry.shutdown();
}
+ @Test
+ public void testOperatorNameTruncation() {
+ Configuration cfg = new Configuration();
+ cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME);
+ MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+ TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+ TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
+ TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0);
+
+ String originalName = new String(new char[100]).replace("\0", "-");
+ OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+
+ String storedName = operatorMetricGroup.getScopeComponents()[0];
+ Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
+ Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
+ }
+
private static class CountingMetricRegistry extends MetricRegistry {
private int counter = 0;