You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/07/10 11:42:37 UTC
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/6292
[FLINK-9789][metrics] Ensure uniqueness of watermark metrics
## What is the purpose of the change
This PR ensures that all registered watermark metric objects are only registered once; shared gauges must be wrapped and registered separately to avoid collisions in reporters which generally assume that metric objects are unique.
## Brief change log
* ensure uniqueness by wrapping re-used metrics in `[One|Two]InputStreamTask` and `OperatorChain`
* add `InterceptingTaskMetricGroup` to gather registered metrics
* modify `[One|Two]InputStreamTask#testWatermarkMetrics` to check uniqueness of registered objects, and cover re-use for tasks
## Verifying this change
* run `[One|Two]InputStreamTask#testWatermarkMetrics`
* run any streaming job and check existence of all watermark metrics for task and operator
* (via REST API: /job/<job_id>/vertices/<vertex_id>/metrics)
* see the [metrics documentation](https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#io) for a list of expected metrics
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 9789
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6292.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6292
----
commit 4a7a0c6114893250e625dc36d953209de22ce57e
Author: zentol <ch...@...>
Date: 2018-07-10T11:27:34Z
[FLINK-9789][metrics] Ensure uniqueness of watermark metrics
----
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6292#discussion_r201347774
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---
@@ -93,7 +93,7 @@ public void init() throws Exception {
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
- getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
+ getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
--- End diff --
will add a comment while merging
---
[GitHub] flink issue #6292: [FLINK-9789][metrics] Ensure uniqueness of watermark metr...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/6292
Relying on hashCode/equals implementations would mean we could no longer use lambdas/method references for gauges which imo isn't a viable option.
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6292#discussion_r201315827
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+ private Map<String, Metric> intercepted;
+
+ /**
+ * Returns the registered metric for the given name, or null if it was never registered.
+ *
+ * @param name metric name
+ * @return registered metric for the given name, or null if it was never registered
+ */
+ public Metric get(String name) {
+ return intercepted.get(name);
--- End diff --
That's true, but the same code exists in the `InterceptingOperatorMetricGroup`. We could simplify that but I'd do it in a separate ticket.
The NPE is virtually impossible to occur unless used in weird ways (i.e. not being passed to a task).
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6292#discussion_r201340299
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---
@@ -93,7 +93,7 @@ public void init() throws Exception {
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
- getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
+ getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
--- End diff --
maybe document that we create a wrapping metric because registered metrics objects have to be unique
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6292#discussion_r201358726
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+ private Map<String, Metric> intercepted;
+
+ /**
+ * Returns the registered metric for the given name, or null if it was never registered.
+ *
+ * @param name metric name
+ * @return registered metric for the given name, or null if it was never registered
+ */
+ public Metric get(String name) {
+ return intercepted.get(name);
--- End diff --
correct.
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6292#discussion_r201352126
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+ private Map<String, Metric> intercepted;
+
+ /**
+ * Returns the registered metric for the given name, or null if it was never registered.
+ *
+ * @param name metric name
+ * @return registered metric for the given name, or null if it was never registered
+ */
+ public Metric get(String name) {
+ return intercepted.get(name);
--- End diff --
this is also only for testing, right?
---
[GitHub] flink issue #6292: [FLINK-9789][metrics] Ensure uniqueness of watermark metr...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/6292
+1 to merge this now but I think in the long run we should make the metrics system not rely on metrics objects being unique (or them correctly implementing `hashCode()`/`equals()`).
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6292#discussion_r201313023
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+ private Map<String, Metric> intercepted;
+
+ /**
+ * Returns the registered metric for the given name, or null if it was never registered.
+ *
+ * @param name metric name
+ * @return registered metric for the given name, or null if it was never registered
+ */
+ public Metric get(String name) {
+ return intercepted.get(name);
--- End diff --
if this method is invoked before `addMetric` , then the `intercepted` would not be initialized, seems it can trigger NPE
---
[GitHub] flink pull request #6292: [FLINK-9789][metrics] Ensure uniqueness of waterma...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6292
---