You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/28 11:12:24 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #19598: [FLINK-25470][changelog] Expose more metrics of materialization

rkhachatryan commented on code in PR #19598:
URL: https://github.com/apache/flink/pull/19598#discussion_r860732625


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -694,6 +705,10 @@ public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
     }
 
+    public ChangelogMetricGroup getMetrics() {
+        return metrics;
+    }

Review Comment:
   I think it makes sense to split `ChangelogMetricGroup` into two: one for backend, and one for materializer. 
   Furthermore, after size moving reporting to `buildSnapshotResult` (as per comment above),
   the backend would only need report these sizes; and Materializer - events, such as started/completed/failed materialization.
   
   That would eliminate the dependency and syncrhonization issues, and make the responsibilities of metric group classes more clear.
   
   WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -672,14 +680,17 @@ public void updateChangelogSnapshotState(
                 subtaskName,
                 upTo,
                 materializedSnapshot);
+        List<KeyedStateHandle> materializedResult = getMaterializedResult(materializedSnapshot);
         changelogSnapshotState =
                 new ChangelogSnapshotState(
-                        getMaterializedResult(materializedSnapshot),
-                        Collections.emptyList(),
-                        upTo,
-                        materializationID);
+                        materializedResult, Collections.emptyList(), upTo, materializationID);
 
         stateChangelogWriter.truncate(upTo);
+        long fullSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getStateSize).sum();
+        long incSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getCheckpointedSize).sum();

Review Comment:
   Maybe move this calculation to the metrics group class?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -216,6 +222,7 @@ public ChangelogKeyedStateBackend(
         this.executionConfig = executionConfig;
         this.ttlTimeProvider = ttlTimeProvider;
         this.keyValueStatesByName = new HashMap<>();
+        this.metrics = new ChangelogMetricGroup(metricGroup);

Review Comment:
   I think it's better to inject `ChangelogMetricGroup` into constructor to make it more testable.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -180,9 +188,11 @@ private void asyncMaterializationPhase(
                             } else if (throwable instanceof CancellationException) {
                                 // can happen e.g. due to task cancellation
                                 LOG.info("materialization cancelled", throwable);
+                                metrics.reportFailedMaterialization();

Review Comment:
   This is not a failure (although in-progress counter/gauge has to be changed I guess).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -672,14 +680,17 @@ public void updateChangelogSnapshotState(
                 subtaskName,
                 upTo,
                 materializedSnapshot);
+        List<KeyedStateHandle> materializedResult = getMaterializedResult(materializedSnapshot);
         changelogSnapshotState =
                 new ChangelogSnapshotState(
-                        getMaterializedResult(materializedSnapshot),
-                        Collections.emptyList(),
-                        upTo,
-                        materializationID);
+                        materializedResult, Collections.emptyList(), upTo, materializationID);
 
         stateChangelogWriter.truncate(upTo);
+        long fullSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getStateSize).sum();
+        long incSizeOfMaterialization =
+                materializedResult.stream().mapToLong(KeyedStateHandle::getCheckpointedSize).sum();
+        metrics.reportCompletedMaterialization(fullSizeOfMaterialization, incSizeOfMaterialization);

Review Comment:
   The user is probably not that interested in materialization per se, but more in the snapshot that is reported to JM; and in particular in changelog part of that snapshot, not the materialized part (so that they can infer recovery time).
   
   So I'd move this to `buildSnapshotResult` and report both materialized and non-materialized part sizes, WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();

Review Comment:
   The syncrhonization is only needed because the failures are handled by different threads (from the async pool), right?
   
   It would be helpful to annotate the class (metrics group) as `@ThreadSafe` and to document the reason why synchronization is needed.
   
   As for `ReentrantLock`, can't we use `synchronized` instead?
   I don't see that any of `ReentrantLock` features are used, while it's more error-prone and less readdable than `synchronized`.
   
   Furthermore, if `Gauge` implementation has to be thread-safe then I'd consider using thread-safe implementations of counters for consistency (instead of synchronizing on the metrics group level).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;
+    private final Counter completedMaterializationCounter;
+    private final Counter failedMaterializationCounter;
+    private final UpdatableGauge<Long> lastFullSizeOfMaterializationGauge;
+    private final UpdatableGauge<Long> lastIncSizeOfMaterializationGauge;
+
+    private final ReentrantLock metricsReadWriteLock = new ReentrantLock();
+
+    ChangelogMetricGroup(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+        this.totalMaterializationCounter = counter(NUMBER_OF_TOTAL_MATERIALIZATION);
+        this.inProgressMaterializationCounter = counter(NUMBER_OF_IN_PROGRESS_MATERIALIZATION);
+        this.completedMaterializationCounter = counter(NUMBER_OF_COMPLETED_MATERIALIZATION);
+        this.failedMaterializationCounter = counter(NUMBER_OF_FAILED_MATERIALIZATION);
+        this.lastFullSizeOfMaterializationGauge =
+                gauge(LATEST_FULL_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+        this.lastIncSizeOfMaterializationGauge =
+                gauge(LATEST_INC_SIZE_OF_MATERIALIZATION, new SimpleUpdatableGauge<>());
+    }
+
+    void reportPendingMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            inProgressMaterializationCounter.inc();
+            totalMaterializationCounter.inc();
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportCompletedMaterialization(
+            long fullSizeOfMaterialization, long incSizeOfMaterialization) {
+        metricsReadWriteLock.lock();
+        try {
+            completedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+            lastFullSizeOfMaterializationGauge.updateValue(fullSizeOfMaterialization);
+            lastIncSizeOfMaterializationGauge.updateValue(incSizeOfMaterialization);
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    void reportFailedMaterialization() {
+        metricsReadWriteLock.lock();
+        try {
+            failedMaterializationCounter.inc();
+            if (canDecrementOfInProgressMaterializationNumber()) {
+                inProgressMaterializationCounter.dec();
+            }
+        } finally {
+            metricsReadWriteLock.unlock();
+        }
+    }
+
+    private boolean canDecrementOfInProgressMaterializationNumber() {
+        boolean decrementLeadsToNegativeNumber =
+                inProgressMaterializationCounter.getCount() - 1 < 0;
+        if (decrementLeadsToNegativeNumber) {
+            String errorMessage =
+                    "Incremented the completed number of materialization "
+                            + "without incrementing the in progress checkpoints before.";
+            LOG.warn(errorMessage);
+        }
+        return !decrementLeadsToNegativeNumber;
+    }
+
+    /** A gauge supports to update the metric value. */
+    private interface UpdatableGauge<T> extends Gauge<T> {
+        void updateValue(T newValue);

Review Comment:
   Why do we need to introduce this kind of `Gauge`?
   Isn't some implementaion, injected from materialization enough?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;

Review Comment:
   I think this should be a `Gauge` because it can increase and decrease (`Counter.dec` is currently not used in Flink and should probably be remvoed).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";

Review Comment:
   How about renaming
   ```
   numberOfTotalMaterialization -> startedMaterializations
   numberOfCompletedMaterialization -> completedMaterializations
   numberOfFailedMaterialization -> failedMaterializations
   ```
   ?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMetricGroup.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Metrics related to the Changelog State Backend. */
+class ChangelogMetricGroup extends ProxyMetricGroup<MetricGroup> {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogMetricGroup.class);
+
+    private static final String PREFIX = "Changelog";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_TOTAL_MATERIALIZATION = PREFIX + ".numberOfTotalMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_IN_PROGRESS_MATERIALIZATION =
+            PREFIX + ".numberOfInProgressMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_COMPLETED_MATERIALIZATION =
+            PREFIX + ".numberOfCompletedMaterialization";
+
+    @VisibleForTesting
+    static final String NUMBER_OF_FAILED_MATERIALIZATION =
+            PREFIX + ".numberOfFailedMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_FULL_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastFullSizeOfMaterialization";
+
+    @VisibleForTesting
+    static final String LATEST_INC_SIZE_OF_MATERIALIZATION =
+            PREFIX + ".lastIncSizeOfMaterialization";
+
+    private final Counter totalMaterializationCounter;
+    private final Counter inProgressMaterializationCounter;

Review Comment:
   Furthermore, there can be at most one in-progress materialization.
   So we probably don't need a metric for that, WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PeriodicMaterializationManager.java:
##########
@@ -132,8 +135,13 @@ public synchronized void close() {
     public void triggerMaterialization() {
         mailboxExecutor.execute(
                 () -> {
-                    Optional<MaterializationRunnable> materializationRunnableOptional =
-                            keyedStateBackend.initMaterialization();
+                    Optional<MaterializationRunnable> materializationRunnableOptional;
+                    try {
+                        materializationRunnableOptional = keyedStateBackend.initMaterialization();
+                    } catch (Exception ex) {
+                        metrics.reportFailedMaterialization();
+                        throw ex;

Review Comment:
   In this case, the task will most likely fail. I'm not sure we need to report the failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org