You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/21 07:06:21 UTC

[flink] branch master updated: [FLINK-30409] Return a new metric group when creating closed metric group

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 535f02c72f3 [FLINK-30409] Return a new metric group when creating closed metric group
535f02c72f3 is described below

commit 535f02c72f3dfdedd4f40a30cc27364a5f5c2cdf
Author: mas-chen <ma...@berkeley.edu>
AuthorDate: Mon Dec 19 12:27:40 2022 -0800

    [FLINK-30409] Return a new metric group when creating closed metric group
    
    This closes #21539
---
 .../metrics/groups/AbstractMetricGroup.java        |  4 ++--
 .../runtime/metrics/groups/MetricGroupTest.java    | 23 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index c2621a7f162..44eb8c2b251 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -461,8 +461,8 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
                 AbstractMetricGroup newGroup = createChildGroup(name, childType);
                 AbstractMetricGroup prior = groups.put(name, newGroup);
-                if (prior == null) {
-                    // no prior group with that name
+                if (prior == null || prior.isClosed()) {
+                    // no prior group or closed group with that name
                     return newGroup;
                 } else {
                     // had a prior group with that name, add the prior group back
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index b9d93419be8..848c7f26d56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -310,6 +310,29 @@ public class MetricGroupTest extends TestLogger {
         assertTrue(subgroup.isClosed());
     }
 
+    @Test
+    public void addClosedGroupReturnsNewGroupInstance() {
+        GenericMetricGroup mainGroup =
+                new GenericMetricGroup(
+                        exceptionOnRegister,
+                        new DummyAbstractMetricGroup(exceptionOnRegister),
+                        "mainGroup");
+
+        AbstractMetricGroup<?> subGroup = (AbstractMetricGroup<?>) mainGroup.addGroup("subGroup");
+
+        assertFalse(subGroup.isClosed());
+
+        subGroup.close();
+        assertTrue(subGroup.isClosed());
+
+        AbstractMetricGroup<?> newSubGroupWithSameNameAsClosedGroup =
+                (AbstractMetricGroup<?>) mainGroup.addGroup("subGroup");
+        assertFalse(
+                "The new subgroup should not be closed",
+                newSubGroupWithSameNameAsClosedGroup.isClosed());
+        assertTrue("The old sub group is not modified", subGroup.isClosed());
+    }
+
     @Test
     public void tolerateMetricNameCollisions() {
         final String name = "abctestname";