You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/22 09:05:56 UTC

[GitHub] [flink] zentol opened a new pull request, #19555: [FLINK-23659][metrics][prometheus] Cleanup code

zentol opened a new pull request, #19555:
URL: https://github.com/apache/flink/pull/19555

   General cleanup in the prometheus reporter code base. Removes various warnings, simplifies some tests and reduces dependencies on runtime-internals.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19555: [FLINK-23659][metrics][prometheus] Cleanup code

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19555:
URL: https://github.com/apache/flink/pull/19555#discussion_r858357105


##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -52,228 +43,159 @@
  * different subtasks.
  */
 class PrometheusReporterTaskScopeTest {
-    private static final String[] LABEL_NAMES = {
-        "job_id",
-        "task_id",
-        "task_attempt_id",
-        "host",
-        "task_name",
-        "task_attempt_num",
-        "job_name",
-        "tm_id",
-        "subtask_index"
-    };
+    private static final String[] LABEL_NAMES = {"label1", "label2"};
+    private static final String[] LABEL_VALUES_1 = new String[] {"value1_1", "value1_2"};
+    private static final String[] LABEL_VALUES_2 = new String[] {"value2_1", "value2_2"};
+    private static final String LOGICAL_SCOPE = "logical_scope";
+    private static final String METRIC_NAME = "myMetric";
+
+    private final MetricGroup metricGroup1 =
+            TestUtils.createTestMetricGroup(
+                    LOGICAL_SCOPE, TestUtils.toMap(LABEL_NAMES, LABEL_VALUES_1));
+    private final MetricGroup metricGroup2 =
+            TestUtils.createTestMetricGroup(
+                    LOGICAL_SCOPE, TestUtils.toMap(LABEL_NAMES, LABEL_VALUES_2));
 
-    private static final String TASK_MANAGER_HOST = "taskManagerHostName";
-    private static final String TASK_MANAGER_ID = "taskManagerId";
-    private static final String JOB_NAME = "jobName";
-    private static final String TASK_NAME = "taskName";
-    private static final int ATTEMPT_NUMBER = 0;
-    private static final int SUBTASK_INDEX_1 = 0;
-    private static final int SUBTASK_INDEX_2 = 1;
-
-    private final JobID jobId = new JobID();
-    private final JobVertexID taskId1 = new JobVertexID();
-    private final ExecutionAttemptID taskAttemptId1 = new ExecutionAttemptID();
-    private final String[] labelValues1 = {
-        jobId.toString(),
-        taskId1.toString(),
-        taskAttemptId1.toString(),
-        TASK_MANAGER_HOST,
-        TASK_NAME,
-        "" + ATTEMPT_NUMBER,
-        JOB_NAME,
-        TASK_MANAGER_ID,
-        "" + SUBTASK_INDEX_1
-    };
-    private final JobVertexID taskId2 = new JobVertexID();
-    private final ExecutionAttemptID taskAttemptId2 = new ExecutionAttemptID();
-    private final String[] labelValues2 = {
-        jobId.toString(),
-        taskId2.toString(),
-        taskAttemptId2.toString(),
-        TASK_MANAGER_HOST,
-        TASK_NAME,
-        "" + ATTEMPT_NUMBER,
-        JOB_NAME,
-        TASK_MANAGER_ID,
-        "" + SUBTASK_INDEX_2
-    };
-
-    private TaskMetricGroup taskMetricGroup1;
-    private TaskMetricGroup taskMetricGroup2;
-
-    private MetricRegistryImpl registry;
     private PrometheusReporter reporter;
 
     @BeforeEach
     void setupReporter() {
-        registry =
-                new MetricRegistryImpl(
-                        MetricRegistryTestUtils.defaultMetricRegistryConfiguration(),
-                        Collections.singletonList(createReporterSetup("test1", "9400-9500")));
-        reporter = (PrometheusReporter) registry.getReporters().get(0);
-
-        TaskManagerMetricGroup tmMetricGroup =
-                TaskManagerMetricGroup.createTaskManagerMetricGroup(
-                        registry, TASK_MANAGER_HOST, new ResourceID(TASK_MANAGER_ID));
-        taskMetricGroup1 =
-                tmMetricGroup
-                        .addJob(jobId, JOB_NAME)
-                        .addTask(
-                                taskId1,
-                                taskAttemptId1,
-                                TASK_NAME,
-                                SUBTASK_INDEX_1,
-                                ATTEMPT_NUMBER);
-
-        taskMetricGroup2 =
-                tmMetricGroup
-                        .addJob(jobId, JOB_NAME)
-                        .addTask(
-                                taskId2,
-                                taskAttemptId2,
-                                TASK_NAME,
-                                SUBTASK_INDEX_2,
-                                ATTEMPT_NUMBER);
+        reporter = new PrometheusReporter(NetUtils.getPortRangeFromString("9400-9500"));
     }
 
     @AfterEach
-    void shutdownRegistry() throws Exception {
-        if (registry != null) {
-            registry.shutdown().get();
+    void tearDown() {
+        if (reporter != null) {
+            reporter.close();
         }
     }
 
     @Test
-    void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
+    void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() {
         Counter counter1 = new SimpleCounter();
         counter1.inc(1);
         Counter counter2 = new SimpleCounter();
         counter2.inc(2);
 
-        taskMetricGroup1.counter("my_counter", counter1);
-        taskMetricGroup2.counter("my_counter", counter2);
+        reporter.notifyOfAddedMetric(counter1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1))
                 .isEqualTo(1.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2))
                 .isEqualTo(2.);
     }
 
     @Test
-    void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
-        Gauge<Integer> gauge1 =
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return 3;
-                    }
-                };
-        Gauge<Integer> gauge2 =
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return 4;
-                    }
-                };
+    void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() {
+        Gauge<Integer> gauge1 = () -> 3;
+        Gauge<Integer> gauge2 = () -> 4;
 
-        taskMetricGroup1.gauge("my_gauge", gauge1);
-        taskMetricGroup2.gauge("my_gauge", gauge2);
+        reporter.notifyOfAddedMetric(gauge1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(gauge2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1))
                 .isEqualTo(3.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2))
                 .isEqualTo(4.);
     }
 
     @Test
-    void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
-        Meter meter = new TestMeter();
+    void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() {
+        Meter meter1 = new TestMeter(1, 1.0);
+        Meter meter2 = new TestMeter(2, 2.0);
 
-        taskMetricGroup1.meter("my_meter", meter);
-        taskMetricGroup2.meter("my_meter", meter);
+        reporter.notifyOfAddedMetric(meter1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(meter2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues1))
-                .isEqualTo(5.);
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1))
+                .isEqualTo(meter1.getRate());
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues2))
-                .isEqualTo(5.);
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2))
+                .isEqualTo(meter2.getRate());
     }
 
     @Test
     void histogramsCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
-        Histogram histogram = new TestHistogram();
+        TestHistogram histogram1 = new TestHistogram();
+        histogram1.setCount(1);
+        TestHistogram histogram2 = new TestHistogram();
+        histogram2.setCount(2);
 
-        taskMetricGroup1.histogram("my_histogram", histogram);
-        taskMetricGroup2.histogram("my_histogram", histogram);
+        reporter.notifyOfAddedMetric(histogram1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(histogram2, METRIC_NAME, metricGroup2);
 
         final String exportedMetrics = pollMetrics(reporter.getPort()).getBody();
-        assertThat(exportedMetrics)
-                .contains("subtask_index=\"0\",quantile=\"0.5\",} 0.5"); // histogram
-        assertThat(exportedMetrics)
-                .contains("subtask_index=\"1\",quantile=\"0.5\",} 0.5"); // histogram
+        assertThat(exportedMetrics).contains("label2=\"value1_2\",} 1.0"); // histogram
+        assertThat(exportedMetrics).contains("label2=\"value2_2\",} 2.0"); // histogram

Review Comment:
   ```suggestion
           assertThat(exportedMetrics).contains("label2=\"value1_2\",} 1.0");
           assertThat(exportedMetrics).contains("label2=\"value2_2\",} 2.0");
   ```
   nit: I know that the comments were also part of my proposal. But they really don't add any value...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19555: [FLINK-23659][metrics][prometheus] Cleanup code

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19555:
URL: https://github.com/apache/flink/pull/19555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19555: [FLINK-23659][metrics][prometheus] Cleanup code

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19555:
URL: https://github.com/apache/flink/pull/19555#discussion_r857409226


##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -52,96 +44,30 @@
  * different subtasks.
  */
 class PrometheusReporterTaskScopeTest {
-    private static final String[] LABEL_NAMES = {
-        "job_id",
-        "task_id",
-        "task_attempt_id",
-        "host",
-        "task_name",
-        "task_attempt_num",
-        "job_name",
-        "tm_id",
-        "subtask_index"
-    };
+    private static final String[] LABEL_NAMES = {"label1", "label2"};
+    private static final String[][] LABEL_VALUES =

Review Comment:
   Why do we use `String[][]` here? We're always accessing the label values through their index. Hence, we could also create two fields `LABEL_VALUES_0` and `LABEL_VALUES_1` (or something similar). Auto-completion would better in that case...



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -152,92 +78,80 @@ void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestExceptio
         Counter counter2 = new SimpleCounter();
         counter2.inc(2);
 
-        taskMetricGroup1.counter("my_counter", counter1);
-        taskMetricGroup2.counter("my_counter", counter2);
+        reporter.notifyOfAddedMetric(counter1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
                 .isEqualTo(1.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
                 .isEqualTo(2.);
     }
 
     @Test
     void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
-        Gauge<Integer> gauge1 =
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return 3;
-                    }
-                };
-        Gauge<Integer> gauge2 =
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return 4;
-                    }
-                };
+        Gauge<Integer> gauge1 = () -> 3;
+        Gauge<Integer> gauge2 = () -> 4;
 
-        taskMetricGroup1.gauge("my_gauge", gauge1);
-        taskMetricGroup2.gauge("my_gauge", gauge2);
+        reporter.notifyOfAddedMetric(gauge1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(gauge2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
                 .isEqualTo(3.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
                 .isEqualTo(4.);
     }
 
     @Test
     void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
         Meter meter = new TestMeter();
 
-        taskMetricGroup1.meter("my_meter", meter);
-        taskMetricGroup2.meter("my_meter", meter);
+        reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
                 .isEqualTo(5.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
                 .isEqualTo(5.);
     }
 
     @Test
     void histogramsCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
         Histogram histogram = new TestHistogram();
 
-        taskMetricGroup1.histogram("my_histogram", histogram);
-        taskMetricGroup2.histogram("my_histogram", histogram);
+        reporter.notifyOfAddedMetric(histogram, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(histogram, METRIC_NAME, metricGroup2);
 
         final String exportedMetrics = pollMetrics(reporter.getPort()).getBody();
         assertThat(exportedMetrics)
-                .contains("subtask_index=\"0\",quantile=\"0.5\",} 0.5"); // histogram
+                .contains("label2=\"value1_2\",quantile=\"0.5\",} 0.5"); // histogram
         assertThat(exportedMetrics)
-                .contains("subtask_index=\"1\",quantile=\"0.5\",} 0.5"); // histogram
+                .contains("label2=\"value2_2\",quantile=\"0.5\",} 0.5"); // histogram

Review Comment:
   nit: What about setting actual values as well in the histograms to differentiate them?
   ```
   TestHistogram histogram0 = new TestHistogram();
   histogram0.setCount(10);
   
   TestHistogram histogram1 = new TestHistogram();
   histogram1.setCount(20);
   
   reporter.notifyOfAddedMetric(histogram0, METRIC_NAME, metricGroup1);
   reporter.notifyOfAddedMetric(histogram1, METRIC_NAME, metricGroup2);
   
   final String exportedMetrics = pollMetrics(reporter.getPort()).getBody();
   assertThat(exportedMetrics).contains("count{label1=\"value1_1\",label2=\"value1_2\",} 10");
   assertThat(exportedMetrics).contains("count{label1=\"value2_1\",label2=\"value2_2\",} 20");
   ```



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -137,21 +123,7 @@ private void assertThatGaugeIsExported(Metric metric, String name, String expect
             throws UnirestException {
         final String prometheusName = SCOPE_PREFIX + name;

Review Comment:
   I guess this unused variable is not needed anymore. The `addMetriAndPollResponse` expects the plain metric name (based on how it was used in `PrometheusReporterTaskScopeTest`) and `createExpectedPollResponse` adds the prefix within the method...



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -152,92 +78,80 @@ void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestExceptio
         Counter counter2 = new SimpleCounter();
         counter2.inc(2);
 
-        taskMetricGroup1.counter("my_counter", counter1);
-        taskMetricGroup2.counter("my_counter", counter2);
+        reporter.notifyOfAddedMetric(counter1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
                 .isEqualTo(1.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
                 .isEqualTo(2.);
     }
 
     @Test
     void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
-        Gauge<Integer> gauge1 =
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return 3;
-                    }
-                };
-        Gauge<Integer> gauge2 =
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return 4;
-                    }
-                };
+        Gauge<Integer> gauge1 = () -> 3;
+        Gauge<Integer> gauge2 = () -> 4;
 
-        taskMetricGroup1.gauge("my_gauge", gauge1);
-        taskMetricGroup2.gauge("my_gauge", gauge2);
+        reporter.notifyOfAddedMetric(gauge1, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(gauge2, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
                 .isEqualTo(3.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
                 .isEqualTo(4.);
     }
 
     @Test
     void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
         Meter meter = new TestMeter();
 
-        taskMetricGroup1.meter("my_meter", meter);
-        taskMetricGroup2.meter("my_meter", meter);
+        reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup1);
+        reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup2);
 
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues1))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
                 .isEqualTo(5.);
         assertThat(
                         CollectorRegistry.defaultRegistry.getSampleValue(
-                                "flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues2))
+                                getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
                 .isEqualTo(5.);

Review Comment:
   Same here: Differentiating the values makes the test more sensitive to failures:
   ```
   reporter.notifyOfAddedMetric(new TestMeter(10, 1), METRIC_NAME, metricGroup1);
   reporter.notifyOfAddedMetric(new TestMeter(10, 2), METRIC_NAME, metricGroup2);
   
   assertThat(
                   CollectorRegistry.defaultRegistry.getSampleValue(
                           getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[0]))
           .isEqualTo(1.);
   assertThat(
                   CollectorRegistry.defaultRegistry.getSampleValue(
                           getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES[1]))
           .isEqualTo(2.);
   ```



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -192,32 +150,31 @@ void histogramIsReportedAsPrometheusSummary() throws UnirestException {
         }
     }
 
+    /**
+     * Metrics with the same name are stored by the reporter in a shared data-structure. This test
+     * ensures that a metric is unregistered from Prometheus even if other metrics with the same
+     * name still exist.
+     */
     @Test
-    void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws UnirestException {
-        JobManagerMetricGroup jmMetricGroup =
-                JobManagerMetricGroup.createJobManagerMetricGroup(registry, HOST_NAME);
-
+    void metricIsRemovedWhileOtherMetricsWithSameNameExist() throws UnirestException {
         String metricName = "metric";
 
         Counter metric1 = new SimpleCounter();
-        FrontMetricGroup<JobManagerJobMetricGroup> metricGroup1 =
-                new FrontMetricGroup<>(
-                        createReporterScopedSettings(),
-                        jmMetricGroup.addJob(JobID.generate(), "job_1"));
-        reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1);
-
         Counter metric2 = new SimpleCounter();
-        FrontMetricGroup<JobManagerJobMetricGroup> metricGroup2 =
-                new FrontMetricGroup<>(
-                        createReporterScopedSettings(),
-                        jmMetricGroup.addJob(JobID.generate(), "job_2"));
-        reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);
 
-        reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup1);
+        final Map<String, String> variables2 = new HashMap<>(metricGroup.getAllVariables());
+        final Map.Entry<String, String> entryToModify = variables2.entrySet().iterator().next();
+        final String labelValueThatShouldBeRemoved = entryToModify.getValue();
+        variables2.put(entryToModify.getKey(), "some_value");
+        final MetricGroup metricGroup2 = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, variables2);
+
+        reporter.notifyOfAddedMetric(metric1, metricName, metricGroup);
+        reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);
+        reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup);
 
         String response = pollMetrics(reporter.getPort()).getBody();
 
-        assertThat(response).doesNotContain("job_1");
+        assertThat(response).doesNotContain(labelValueThatShouldBeRemoved);

Review Comment:
   Why is it enough here to just check for the removal? Shouldn't we also check for the other metric not being touched? 🤔 



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -48,53 +40,47 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
-import static org.apache.flink.metrics.prometheus.PrometheusReporterFactory.ARG_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Basic test for {@link PrometheusReporter}. */
 class PrometheusReporterTest {

Review Comment:
   nit: some of the test method signatures can be cleaned: The exception is not always thrown...



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -271,27 +228,16 @@ void registeringSameMetricTwiceDoesNotThrowException() {
 
     @Test
     void cannotStartTwoReportersOnSamePort() throws Exception {
-        ReporterSetup setup1 = createReporterSetup("test1", portRangeProvider.next());
-
-        int usedPort = ((PrometheusReporter) setup1.getReporter()).getPort();
-
-        try {
-            assertThatThrownBy(() -> createReporterSetup("test2", String.valueOf(usedPort)))
-                    .isInstanceOf(Exception.class);
-        } finally {
-            setup1.getReporter().close();
-        }
+        assertThatThrownBy(
+                        () ->
+                                new PrometheusReporter(
+                                        Collections.singleton(reporter.getPort()).iterator()))
+                .isInstanceOf(Exception.class);
     }
 
     @Test
     void canStartTwoReportersWhenUsingPortRange() throws Exception {
-        String portRange = portRangeProvider.next();
-
-        ReporterSetup setup1 = createReporterSetup("test1", portRange);
-        ReporterSetup setup2 = createReporterSetup("test2", portRange);
-
-        setup1.getReporter().close();
-        setup2.getReporter().close();
+        new PrometheusReporter(portRangeProvider.next()).close();

Review Comment:
   I guess, we're testing different things here: The old implementation used the same range. The new test implementations runs on two different ranges (the iterator increases the base of the next range by 100)...



##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -52,96 +44,30 @@
  * different subtasks.
  */
 class PrometheusReporterTaskScopeTest {

Review Comment:
   nit: The `UnirestException` in the test signatures are obsolete in most of the tests...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19555: [FLINK-23659][metrics][prometheus] Cleanup code

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19555:
URL: https://github.com/apache/flink/pull/19555#issuecomment-1106223761

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "81e659448db7c78ee65291996ada0fc9d48f4081",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "81e659448db7c78ee65291996ada0fc9d48f4081",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 81e659448db7c78ee65291996ada0fc9d48f4081 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org