You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/07/07 10:04:06 UTC

[GitHub] flink pull request #2210: [FLINK-4167] [metrics] Close IOMetricGroup in Task...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/2210

    [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricGroup

    This closes the `ioMetrics` metric group contained in `TaskMetricGroup`. Not closing this group caused that metrics of this group weren't deregistered after the end of a job.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixIOMetricsGroup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2210
    
----
commit c0f028f1e8605c332973afec2b5c1144a2815318
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-07-07T09:37:39Z

    [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricGroup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    The culprit is `StreamInputProcessor.java:220`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    It gets an IOMG because in the past it would forward this group to its deserializers, for Input byte counting etc. . I didn't change it cause i forgot about my own convention on how to use the IOMG.
    
    The IOMG only exist for the purpose of keeping details out of the TaskMG (the getters mostly), and to prevent users of the IMOG to be able to access other SubGroups of the TaskMG (separation of concerns). There is de-facto no difference between a metric registered on a TaskMG or IOMG.
    
    Thus, the underlying issue imo is that the IOMG extends AbstractMetricGroup; giving it more functionality than it should have. Instead it should extend a class as outlined below.
    
    ```
    public abstract class SubMetricGroup<P extends MetricGroup> implements MetricGroup {
    	private P parent;
    	
    	public SubMetricGroup(P parent) {
    		this.parent = parent;
    	}
    
    	@Override
    	public Counter counter(int name) {
    		return parent.counter(name);
    	}
    
    	@Override
    	public Counter counter(String name) {
    		return parent.counter(name);
    	}
    
    	@Override
    	public <C extends Counter> C counter(int name, C counter) {
    		return parent.counter(name, counter);
    	}
    
    	@Override
    	public <C extends Counter> C counter(String name, C counter) {
    		return parent.counter(name, counter);
    	}
    
    	@Override
    	public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
    		return parent.gauge(name, gauge);
    	}
    
    	@Override
    	public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
    		return parent.gauge(name, gauge);
    	}
    
    	@Override
    	public <H extends Histogram> H histogram(String name, H histogram) {
    		return parent.histogram(name, histogram);
    	}
    
    	@Override
    	public <H extends Histogram> H histogram(int name, H histogram) {
    		return parent.histogram(name, histogram);
    	}
    
    	@Override
    	public MetricGroup addGroup(int name) {
    		return parent.addGroup(name);
    	}
    
    	@Override
    	public MetricGroup addGroup(String name) {
    		return parent.addGroup(name);
    	}
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    Why do we give the `StreamInputProcessor` a reference to the `IOMetricGroup` in the first place? Why not giving it the corresponding `TaskMetricGroup`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    Only had a small comment, otherwise +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    Ah ok, I see the underlying problem now. I will adapt my PR wrt your feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    The problem was that the `currentLowWatermark` gauges weren't deregistered after the job terminated. This metric is registered by the `StreamInputProcessor` directly on the `IOMetricGroup` but never deregistered, because the `ioMetrics` group is never closed by the `TaskMetricGroup`.
    
    I guess that the problem is then that the `StreamInputProcessor` shouldn't register the `currentLowWatermark` metric. Instead, the IOMetricGroup should preregister the corresponding metric?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2210: [FLINK-4167] [metrics] Close IOMetricGroup in Task...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2210#discussion_r70614548
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.groups;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Metric group which forwards all registration calls to its parent metric group.
    + *
    + * @param <P> Type of the parent metric group
    + */
    +@Internal
    +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
    +	private final P parentMetricGroup;
    +
    +	public ProxyMetricGroup(P parentMetricGroup) {
    +		this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup);
    +	}
    +
    +	@Override
    +	public final void close() {
    +		parentMetricGroup.close();
    --- End diff --
    
    Hmm you're right, it is a bit tricky. You don't want to close the complete parent metric group. But I guess you would want to unregister the metrics registered by the `IOMetricGroup` if you call `IOMetricGroup#close`. However, this is not really possible at the moment. I will remove the `parent.close` call then.
    
    I agree that `close` and `isClosed` should only be used internally and not being exposed to the user. This could be a follow-up task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    We could do that but it wouldn't be pretty. You would need a weird setValue() method on that Gauge that the StreamInputProcessor can use.
    
    Forwarding the register calls to the parent is the best solution i can come up with at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2210: [FLINK-4167] [metrics] Close IOMetricGroup in Task...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2210#discussion_r70597751
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.metrics.groups;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.Gauge;
    +import org.apache.flink.metrics.Histogram;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Metric group which forwards all registration calls to its parent metric group.
    + *
    + * @param <P> Type of the parent metric group
    + */
    +@Internal
    +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
    +	private final P parentMetricGroup;
    +
    +	public ProxyMetricGroup(P parentMetricGroup) {
    +		this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup);
    +	}
    +
    +	@Override
    +	public final void close() {
    +		parentMetricGroup.close();
    --- End diff --
    
    It is safer to not do anything here. `close()` should only be called by the Task on the `TaskMetricGroup`, this would open us to the possibility of components closing the TaskMG as well.
    
    There's also the looming StackOverflow when someone puts `ioMetrics.close()` into the `TaskMetricGroup#close()`.
    
    Now that i think about it i believe `close()` (and by extension, `isClosed()`) has no business being in the MetricGroup interface in the first place, as users actually don't need to call it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2210: [FLINK-4167] [metrics] Close IOMetricGroup in Task...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2210


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    I've addressed your comments @zentol and introduced a `ProxyMetricGroup` which simply forwards all calls as you've suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2210: [FLINK-4167] [metrics] Close IOMetricGroup in TaskMetricG...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2210
  
    Calling close() isn't necessary since no metric should ever be registered directly on the IOMetricGroup. This group is supposed to only contain pre-defined metrics that can be accessed from multiple places. These metrics are registered on the TaskMetricGroup, and thus cleaned up.
    
    A proper fix would be to override the addMetric() method to throw an exception when that is done, or to refer registration to the TaskMetricGroup.
    
    Addendum for clarification: TaskMetrics aren't technically cleaned up at the end of the job, but when no task for a given job runs on a TM.
    
    Given the current usage of the IOMetricGroup i sincerely doubt that is caused an actual issue, and thus would like you provide more information on the issue you faced.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---