You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/28 02:37:31 UTC

[GitHub] [flink] masteryhx opened a new pull request, #19598: [FLINK-25470][changelog] Expose more metrics of materialization

masteryhx opened a new pull request, #19598:
URL: https://github.com/apache/flink/pull/19598

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This pull request adds more metric of materialization for changelog:
   - numTotalMaterialization
   - numInProgressMaterialization
   - numCompletedMaterialization
   - numFailedMaterialization
   - lastFullSizeOfMaterialization
   - lastIncSizeOfMaterialization
   
   ## Brief change log
   
   - Added `ChangelogMetricGroup` to support reporting related metrics.
   - Moved `ExceptionallyDoneFuture` and `DelegatedStateBackendWrapper` to runtime test module to reuse.
   
   ## Verifying this change
   
   - Added `ChangelogMetricGroupTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861893623


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            failedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    private boolean canDecrementOfInProgressMaterializationNumber() {
+        boolean decrementLeadsToNegativeNumber =
+                inProgressMaterializationCounter.getCount() - 1 < 0;
+        if (decrementLeadsToNegativeNumber) {
+            String errorMessage =
+                    "Incremented the completed number of materialization "
+                            + "without incrementing the in progress checkpoints before.";
+            LOG.warn(errorMessage);
+        }
+        return !decrementLeadsToNegativeNumber;
+    }
+
+    /** A gauge supports to update the metric value. */
+    private interface UpdatableGauge<T> extends Gauge<T> {
+        void updateValue(T newValue);

Review Comment:
   I have removed it and maintain some variables in the metric group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861889523


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -694,6 +705,10 @@ public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
     }
 
+    public ChangelogMetricGroup getMetrics() {
+        return metrics;
+    }

Review Comment:
   I have splitted them into two classes and change the place invokes them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861908383


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";

Review Comment:
   Sure. I have modified them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r867776186


##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -1277,7 +1277,7 @@ Note that the metrics are only available via reporters.
   </thead>
   <tbody>
     <tr>
-      <th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
+      <th rowspan="8"><strong>Job (only available on TaskManager)</strong></th>

Review Comment:
   Which you pointed out is just chinese version ? We are in content.zh currently ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r867396693


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860732625


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -694,6 +705,10 @@ public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
     }
 
+    public ChangelogMetricGroup getMetrics() {
+        return metrics;
+    }

Review Comment:
   I think it makes sense to split `ChangelogMetricGroup` into two: one for backend, and one for materializer. 
   Furthermore, after size moving reporting to `buildSnapshotResult` (as per comment [below](https://github.com/apache/flink/pull/19598#discussion_r860735113)),
   the backend would only need report these sizes; and Materializer - events, such as started/completed/failed materialization.
   
   That would eliminate the dependency and syncrhonization issues, and make the responsibilities of metric group classes more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860807435


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -694,6 +705,10 @@ public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
     }
 
+    public ChangelogMetricGroup getMetrics() {
+        return metrics;
+    }

Review Comment:
   > the backend would only need report these sizes; and Materializer - events, such as started/completed/failed materialization.
   
   Maybe I could move completed into materialization part, but the precise place about starting materilization is `initMaterialization` where I report currently (From starting sync materlization on).
    
   Or maybe we could just report started before `initMaterialization` even if it doesn't have done a real materilization ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19598:
URL: https://github.com/apache/flink/pull/19598#issuecomment-1111678743

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5186fe07239ea58284c8b640ba3775afd88842da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5186fe07239ea58284c8b640ba3775afd88842da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5186fe07239ea58284c8b640ba3775afd88842da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861891790


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;

Review Comment:
   I have removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r867784366


##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -1277,7 +1277,7 @@ Note that the metrics are only available via reporters.
   </thead>
   <tbody>
     <tr>
-      <th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
+      <th rowspan="8"><strong>Job (only available on TaskManager)</strong></th>

Review Comment:
   Sorry, I meant to synchronize the both versions (in regard to this edit).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860931294


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -694,6 +705,10 @@ public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
     }
 
+    public ChangelogMetricGroup getMetrics() {
+        return metrics;
+    }

Review Comment:
   If I understand correctly, you are talking about where to call `reportPendingMaterialization()`: `ChangelogKeyedStateBackend.initMaterialization` or `PeriodicMaterializationManager.triggerMaterialization`.
   I think it doesn't matter much in terms of usefulness (on one hand, it might be useful to know whether the materialization was scheduled or not at all; OTH, `mailboxExecutor` might delay it after triggering).
   So I think the latter (from `PeriodicMaterializationManager`) is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860932007


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -216,6 +222,7 @@ public ChangelogKeyedStateBackend(
         this.executionConfig = executionConfig;
         this.ttlTimeProvider = ttlTimeProvider;
         this.keyValueStatesByName = new HashMap<>();
+        this.metrics = new ChangelogMetricGroup(metricGroup);

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861908063


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -180,9 +188,11 @@ private void asyncMaterializationPhase(
                             } else if (throwable instanceof CancellationException) {
                                 // can happen e.g. due to task cancellation
                                 LOG.info("materialization cancelled", throwable);
+                                metrics.reportFailedMaterialization();

Review Comment:
   I have removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r867395555


##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -1277,7 +1277,7 @@ Note that the metrics are only available via reporters.
   </thead>
   <tbody>
     <tr>
-      <th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
+      <th rowspan="8"><strong>Job (only available on TaskManager)</strong></th>

Review Comment:
   This change should probably be replicated in the Chinese version.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -672,12 +685,10 @@ public void updateChangelogSnapshotState(
                 subtaskName,
                 upTo,
                 materializedSnapshot);
+        List<KeyedStateHandle> materializedResult = getMaterializedResult(materializedSnapshot);

Review Comment:
   nit: this variable can be inlined back



##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -1317,6 +1317,42 @@ Note that the metrics are only available via reporters.
       <td>Current size of upload queue. Queue items can be packed together and form a single upload.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <th rowspan="7"><strong>Task/Operator</strong></th>
+      <td>startedMaterialization</td>
+      <td>The number of started materialization.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>completedMaterialization</td>
+      <td>The number of successfully completed materialization.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>failedMaterialization</td>
+      <td>The number of failed materialization.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>lastFullSizeOfMaterialization</td>
+      <td>The full size of the last materialization part (in bytes).</td>

Review Comment:
   This is actually last reported checkpoint, not last materialization, right?
   How about
   "The full size of the materialization part of the last reported checkpoint (in bytes)."
   ?
   
   ditto: other added metrics.



##########
docs/content.zh/docs/ops/metrics.md:
##########
@@ -1317,6 +1317,42 @@ Note that the metrics are only available via reporters.
       <td>Current size of upload queue. Queue items can be packed together and form a single upload.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <th rowspan="7"><strong>Task/Operator</strong></th>
+      <td>startedMaterialization</td>
+      <td>The number of started materialization.</td>

Review Comment:
   The number of started materialization[s].
   ?
   
   ditto: other added metrics



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -132,8 +136,14 @@ public synchronized void close() {
     public void triggerMaterialization() {
         mailboxExecutor.execute(
                 () -> {
-                    Optional<MaterializationRunnable> materializationRunnableOptional =
-                            keyedStateBackend.initMaterialization();
+                    metrics.reportStartedMaterialization();
+                    Optional<MaterializationRunnable> materializationRunnableOptional;
+                    try {
+                        materializationRunnableOptional = keyedStateBackend.initMaterialization();
+                    } catch (Exception ex) {
+                        metrics.reportFailedMaterialization();
+                        throw ex;
+                    }
 
                     if (materializationRunnableOptional.isPresent()) {

Review Comment:
   In the opposite case (optional not present), shouldn't completed materialization be reported? Or maybe empty materialization?
   My reasoning is that with no state updates, the number of started materializations might be much higher than the sum of completed and failed.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -387,10 +393,17 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
                                         buildSnapshotResult(
                                                 checkpointId,
                                                 delta,
-                                                changelogStateBackendStateCopy)));
+                                                changelogStateBackendStateCopy))
+                        .whenComplete(
+                                (snapshotResult, throwable) ->
+                                        metrics.reportSnapshotResult(snapshotResult))

Review Comment:
   This can be executed by a writer thread (upon finishing the upload);
   or by the task thread, if the uploads are already finished.
   
   So this call should **probably** be syncrhonized.
   
   The common approach in Flink is to not synchronize metric writes with reads, and allow reporting stale metrics. But here, the update might happen from a different thread, which might lead to inconsistencies (e.g. mixing sizes from different checkpoitns).
   
   Adding synchornization shouldn't incur performance penalty, as this code exeutes infrequently.
   
   So how about marking `reportSnapshotResults` method `synchronized`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** This class contains test utils of {@link StateBackend} */
+public class StateBackendTestUtils {

Review Comment:
   Shouldn't this class be a singleton, and have a private constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860786759


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -216,6 +222,7 @@ public ChangelogKeyedStateBackend(
         this.executionConfig = executionConfig;
         this.ttlTimeProvider = ttlTimeProvider;
         this.keyValueStatesByName = new HashMap<>();
+        this.metrics = new ChangelogMetricGroup(metricGroup);

Review Comment:
   Do you mean moving `ChangelogMetricGroup` to the paramater of constructor of ChangelogKeyedStateBackend ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860933184


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();

Review Comment:
   The number of failures can still be incremented by different threads IIUC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861907199


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -132,8 +135,13 @@ public synchronized void close() {
     public void triggerMaterialization() {
         mailboxExecutor.execute(
                 () -> {
-                    Optional<MaterializationRunnable> materializationRunnableOptional =
-                            keyedStateBackend.initMaterialization();
+                    Optional<MaterializationRunnable> materializationRunnableOptional;
+                    try {
+                        materializationRunnableOptional = keyedStateBackend.initMaterialization();
+                    } catch (Exception ex) {
+                        metrics.reportFailedMaterialization();
+                        throw ex;

Review Comment:
   I think maybe it's useful for users to see the metric history and find the materilization failure even if the job have restored and ran some times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861899241


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();

Review Comment:
   I think the ThreadSafeCounter is enough for this and just moved it into metrics module. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r861889843


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -672,14 +680,17 @@ public void updateChangelogSnapshotState(
                 subtaskName,
                 upTo,
                 materializedSnapshot);
+        List<KeyedStateHandle> materializedResult = getMaterializedResult(materializedSnapshot);
         changelogSnapshotState =
                 new ChangelogSnapshotState(
-                        getMaterializedResult(materializedSnapshot),
-                        Collections.emptyList(),
-                        upTo,
-                        materializationID);
+                        materializedResult, Collections.emptyList(), upTo, materializationID);
 
         stateChangelogWriter.truncate(upTo);
+        long fullSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getStateSize).sum();
+        long incSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getCheckpointedSize).sum();
+        metrics.reportCompletedMaterialization(fullSizeOfMaterialization, incSizeOfMaterialization);

Review Comment:
   I agree with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19598:
URL: https://github.com/apache/flink/pull/19598#issuecomment-1124968458

   Build failure is unrelated, the failed stage [succeeded](https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1570&view=logs&s=ae4f8708-9994-57d3-c2d7-b892156e7812&j=0a15d512-44ac-5ba5-97ab-13a5d066c22c) in my pravate branch. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan merged pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan merged PR #19598:
URL: https://github.com/apache/flink/pull/19598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860732625


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -694,6 +705,10 @@ public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
     }
 
+    public ChangelogMetricGroup getMetrics() {
+        return metrics;
+    }

Review Comment:
   I think it makes sense to split `ChangelogMetricGroup` into two: one for backend, and one for materializer. 
   Furthermore, after size moving reporting to `buildSnapshotResult` (as per comment above),
   the backend would only need report these sizes; and Materializer - events, such as started/completed/failed materialization.
   
   That would eliminate the dependency and syncrhonization issues, and make the responsibilities of metric group classes more clear.
   
   WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -672,14 +680,17 @@ public void updateChangelogSnapshotState(
                 subtaskName,
                 upTo,
                 materializedSnapshot);
+        List<KeyedStateHandle> materializedResult = getMaterializedResult(materializedSnapshot);
         changelogSnapshotState =
                 new ChangelogSnapshotState(
-                        getMaterializedResult(materializedSnapshot),
-                        Collections.emptyList(),
-                        upTo,
-                        materializationID);
+                        materializedResult, Collections.emptyList(), upTo, materializationID);
 
         stateChangelogWriter.truncate(upTo);
+        long fullSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getStateSize).sum();
+        long incSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getCheckpointedSize).sum();

Review Comment:
   Maybe move this calculation to the metrics group class?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -216,6 +222,7 @@ public ChangelogKeyedStateBackend(
         this.executionConfig = executionConfig;
         this.ttlTimeProvider = ttlTimeProvider;
         this.keyValueStatesByName = new HashMap<>();
+        this.metrics = new ChangelogMetricGroup(metricGroup);

Review Comment:
   I think it's better to inject `ChangelogMetricGroup` into constructor to make it more testable.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -180,9 +188,11 @@ private void asyncMaterializationPhase(
                             } else if (throwable instanceof CancellationException) {
                                 // can happen e.g. due to task cancellation
                                 LOG.info("materialization cancelled", throwable);
+                                metrics.reportFailedMaterialization();

Review Comment:
   This is not a failure (although in-progress counter/gauge has to be changed I guess).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -672,14 +680,17 @@ public void updateChangelogSnapshotState(
                 subtaskName,
                 upTo,
                 materializedSnapshot);
+        List<KeyedStateHandle> materializedResult = getMaterializedResult(materializedSnapshot);
         changelogSnapshotState =
                 new ChangelogSnapshotState(
-                        getMaterializedResult(materializedSnapshot),
-                        Collections.emptyList(),
-                        upTo,
-                        materializationID);
+                        materializedResult, Collections.emptyList(), upTo, materializationID);
 
         stateChangelogWriter.truncate(upTo);
+        long fullSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getStateSize).sum();
+        long incSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getCheckpointedSize).sum();
+        metrics.reportCompletedMaterialization(fullSizeOfMaterialization, incSizeOfMaterialization);

Review Comment:
   The user is probably not that interested in materialization per se, but more in the snapshot that is reported to JM; and in particular in changelog part of that snapshot, not the materialized part (so that they can infer recovery time).
   
   So I'd move this to `buildSnapshotResult` and report both materialized and non-materialized part sizes, WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();

Review Comment:
   The syncrhonization is only needed because the failures are handled by different threads (from the async pool), right?
   
   It would be helpful to annotate the class (metrics group) as `@ThreadSafe` and to document the reason why synchronization is needed.
   
   As for `ReentrantLock`, can't we use `synchronized` instead?
   I don't see that any of `ReentrantLock` features are used, while it's more error-prone and less readdable than `synchronized`.
   
   Furthermore, if `Gauge` implementation has to be thread-safe then I'd consider using thread-safe implementations of counters for consistency (instead of synchronizing on the metrics group level).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            failedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    private boolean canDecrementOfInProgressMaterializationNumber() {
+        boolean decrementLeadsToNegativeNumber =
+                inProgressMaterializationCounter.getCount() - 1 < 0;
+        if (decrementLeadsToNegativeNumber) {
+            String errorMessage =
+                    "Incremented the completed number of materialization "
+                            + "without incrementing the in progress checkpoints before.";
+            LOG.warn(errorMessage);
+        }
+        return !decrementLeadsToNegativeNumber;
+    }
+
+    /** A gauge supports to update the metric value. */
+    private interface UpdatableGauge<T> extends Gauge<T> {
+        void updateValue(T newValue);

Review Comment:
   Why do we need to introduce this kind of `Gauge`?
   Isn't some implementaion, injected from materialization enough?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;

Review Comment:
   I think this should be a `Gauge` because it can increase and decrease (`Counter.dec` is currently not used in Flink and should probably be remvoed).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";

Review Comment:
   How about renaming
   ```
   numberOfTotalMaterialization -> startedMaterializations
   numberOfCompletedMaterialization -> completedMaterializations
   numberOfFailedMaterialization -> failedMaterializations
   ```
   ?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;

Review Comment:
   Furthermore, there can be at most one in-progress materialization.
   So we probably don't need a metric for that, WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -132,8 +135,13 @@ public synchronized void close() {
     public void triggerMaterialization() {
         mailboxExecutor.execute(
                 () -> {
-                    Optional<MaterializationRunnable> materializationRunnableOptional =
-                            keyedStateBackend.initMaterialization();
+                    Optional<MaterializationRunnable> materializationRunnableOptional;
+                    try {
+                        materializationRunnableOptional = keyedStateBackend.initMaterialization();
+                    } catch (Exception ex) {
+                        metrics.reportFailedMaterialization();
+                        throw ex;

Review Comment:
   In this case, the task will most likely fail. I'm not sure we need to report the failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860810267


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();

Review Comment:
   > The syncrhonization is only needed because the failures are handled by different threads (from the async pool), right?
   
   If we remove in-progress counter, maybe we could not need the syncrhonization again ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org