You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/19 12:30:54 UTC

[GitHub] [flink] masteryhx opened a new pull request, #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

masteryhx opened a new pull request, #19142:
URL: https://github.com/apache/flink/pull/19142

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This pull request aims to support to switch from enabling changelog state backend to disabling changelog state backend.
   
   ## Brief change log
   
    - *`StreamTaskStateInitializerImpl` uses `ChangelogStateBackend` to restore and return delegated state backend while switching*
    - *Make `CheckpointBoundKeyedStateHandle` support to rebound checkpoint id, and always rebound checkpoint id while switching* 
   
   ## Verifying this change
   
   Added `ChangelogPeriodicMaterializationSwitchStateBackendITCase.java`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r854517171


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageView.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+
+/**
+ * A storage view for changelog. Could produce {@link StateChangelogHandleReader} for read. Please
+ * use {@link StateChangelogStorageLoader} to obtain an instance.
+ */
+@Internal
+public interface StateChangelogStorageView<Handle extends ChangelogStateHandle>
+        extends AutoCloseable {
+
+    StateChangelogHandleReader<Handle> createReader();
+
+    @Override
+    default void close() throws Exception {}
+
+    default AvailabilityProvider getAvailabilityProvider() {
+        return () -> AvailabilityProvider.AVAILABLE;
+    }

Review Comment:
   This method is only relevant for writing, so it can be pushed down to `StateChangelogStorage` interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,26 +364,57 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        // Wrapping ChangelogStateBackend or ChangelogStateBackendHandle is not supported currently.
+        if (!isChangelogStateBackend(originalStateBackend)
+                && keyedStateHandles.stream()
+                        .anyMatch(
+                                stateHandle ->
+                                        stateHandle instanceof ChangelogStateBackendHandle)) {
+            return loadChangelogStateBackend(
+                    originalStateBackend, classLoader, CHANGELOG_STATE_BACKEND_FOR_RECOVERY);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend) {
+        return CHANGELOG_STATE_BACKEND.equals(backend.getClass().getName());
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, String className)

Review Comment:
   NIT: now the method is not specific to changelog (but to `DelegatingStateBackend`). So it can be renamed to something like `wrapStateBackend`



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create consistent with
+        //  the method of getOrCreateKeyedState currently which could be removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+

Review Comment:
   I'm wondering whether it makes sense to extract this constructor call into static function; that would prevent leaking of `this` into the created object. 
   WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Maintains the lifecycle of all {@link ChangelogState}s. */
+public class ChangelogStateFactory {
+
+    /**
+     * Unwrapped changelog states used for recovery (not wrapped into e.g. TTL, latency tracking).
+     */
+    private final Map<String, ChangelogState> changelogStates;
+
+    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName;
+
+    public ChangelogStateFactory() {
+        this.changelogStates = new HashMap<>();
+        this.priorityQueueStatesByName = new HashMap<>();
+    }
+
+    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    StateDescriptor.Type.VALUE,
+                                    (StateFactory) ChangelogValueState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.LIST,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.REDUCING,
+                                    (StateFactory) ChangelogReducingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.AGGREGATING,
+                                    (StateFactory) ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.MAP,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        return create(
+                stateDescriptor, internalKvState, VoidStateChangeLogger.getInstance(), keyContext);
+    }
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            KvStateChangeLogger<V, N> kvStateChangeLogger,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        ChangelogState changelogState =
+                getStateFactory(stateDescriptor)
+                        .create(internalKvState, kvStateChangeLogger, keyContext);
+        changelogStates.put(stateDescriptor.getName(), changelogState);
+        return changelogState;
+    }
+
+    public <T> ChangelogKeyGroupedPriorityQueue<T> create(
+            String stateName,
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue,
+            TypeSerializer<T> serializer) {
+        return create(
+                stateName, internalPriorityQueue, VoidStateChangeLogger.getInstance(), serializer);
+    }
+
+    public <T> ChangelogKeyGroupedPriorityQueue<T> create(
+            String stateName,
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue,
+            StateChangeLogger<T, Void> logger,
+            TypeSerializer<T> serializer) {
+        ChangelogKeyGroupedPriorityQueue<T> changelogKeyGroupedPriorityQueue =
+                new ChangelogKeyGroupedPriorityQueue<>(internalPriorityQueue, logger, serializer);
+        priorityQueueStatesByName.put(stateName, changelogKeyGroupedPriorityQueue);
+        return changelogKeyGroupedPriorityQueue;
+    }
+
+    /**
+     * @param name state name
+     * @param type state type (the only supported type currently are: {@link
+     *     StateMetaInfoSnapshot.BackendStateType#KEY_VALUE key value}, {@link
+     *     StateMetaInfoSnapshot.BackendStateType#PRIORITY_QUEUE priority queue})
+     * @return an existing state, i.e. the one that was already created. The returned state will not
+     *     apply TTL to the passed values, regardless of the TTL settings. This prevents double
+     *     applying of TTL (recovered values are TTL values if TTL was enabled). The state will,
+     *     however, use TTL serializer if TTL is enabled. WARN: only valid during the recovery.
+     * @throws UnsupportedOperationException if state type is not supported
+     */
+    public ChangelogState getExistingState(String name, StateMetaInfoSnapshot.BackendStateType type)
+            throws UnsupportedOperationException {
+        ChangelogState state;
+        switch (type) {
+            case KEY_VALUE:
+                state = changelogStates.get(name);
+                break;
+            case PRIORITY_QUEUE:
+                state = priorityQueueStatesByName.get(name);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unknown state type %s (%s)", type, name));
+        }
+        return state;
+    }
+
+    public void resetAllWritingMetaFlags() {
+        for (ChangelogState changelogState : changelogStates.values()) {
+            changelogState.resetWritingMetaFlag();
+        }
+
+        for (ChangelogKeyGroupedPriorityQueue<?> priorityQueueState :
+                priorityQueueStatesByName.values()) {
+            priorityQueueState.resetWritingMetaFlag();
+        }
+    }
+
+    public void dispose() {
+        changelogStates.clear();
+        priorityQueueStatesByName.clear();
+    }
+
+    private <S extends State, V> StateFactory getStateFactory(
+            StateDescriptor<S, V> stateDescriptor) {
+        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getType());
+        if (stateFactory == null) {
+            String message =
+                    String.format(
+                            "State %s is not supported by %s",
+                            stateDescriptor.getClass(), ChangelogKeyedStateBackend.class);
+            throw new FlinkRuntimeException(message);
+        }
+        return stateFactory;
+    }
+
+    private static class VoidStateChangeLogger<Value, Namespace>
+            implements KvStateChangeLogger<Value, Namespace>, StateChangeLogger<Value, Namespace> {

Review Comment:
   This logger is only used by `ChangelogMigrationRestoreTarget`, right?
   If so, I'd move it there and inline the methods that are using it (`create`).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create consistent with
+        //  the method of getOrCreateKeyedState currently which could be removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+
+            @Override
+            public void setCurrentKey(K newKey) {
+                keyedStateBackend.setCurrentKey(newKey);
+            }
+
+            @Override
+            public void notifyCheckpointComplete(long checkpointId) throws Exception {
+                keyedStateBackend.notifyCheckpointComplete(checkpointId);
+            }
+
+            @Nonnull
+            @Override
+            public SavepointResources<K> savepoint() throws Exception {
+                return keyedStateBackend.savepoint();
+            }
+
+            @Override
+            public int numKeyValueStateEntries() {
+                return keyedStateBackend.numKeyValueStateEntries();
+            }
+
+            @Override
+            public <N> Stream<K> getKeys(String state, N namespace) {
+                return keyedStateBackend.getKeys(state, namespace);
+            }
+
+            @Override
+            public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
+                return keyedStateBackend.getKeysAndNamespaces(state);
+            }
+
+            @Nonnull
+            @Override
+            public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
+                    @Nonnull TypeSerializer<N> namespaceSerializer,
+                    @Nonnull StateDescriptor<S, SV> stateDesc,
+                    @Nonnull
+                            StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
+                                    snapshotTransformFactory)
+                    throws Exception {
+                return keyedStateBackend.createInternalState(
+                        namespaceSerializer, stateDesc, snapshotTransformFactory);
+            }
+
+            @Override
+            public <N, S extends State> S getPartitionedState(
+                    N namespace,
+                    TypeSerializer<N> namespaceSerializer,
+                    StateDescriptor<S, ?> stateDescriptor)
+                    throws Exception {
+                S partitionedState =
+                        keyedStateBackend.getPartitionedState(
+                                namespace, namespaceSerializer, stateDescriptor);
+                functionDelegationHelper.addOrUpdate(stateDescriptor);
+                return partitionedState;
+            }
+
+            @Override
+            public <N, S extends State, V> S getOrCreateKeyedState(
+                    TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+                    throws Exception {
+                S keyedState =
+                        keyedStateBackend.getOrCreateKeyedState(
+                                namespaceSerializer, stateDescriptor);
+                functionDelegationHelper.addOrUpdate(stateDescriptor);
+                return keyedState;
+            }
+
+            @Nonnull
+            @Override
+            @SuppressWarnings("unchecked")
+            public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+                    KeyGroupedInternalPriorityQueue<T> create(
+                            @Nonnull String stateName,
+                            @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+                ChangelogKeyGroupedPriorityQueue<T> existingState =
+                        (ChangelogKeyGroupedPriorityQueue<T>)
+                                changelogStateFactory.getExistingState(
+                                        stateName,
+                                        StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+                return existingState == null
+                        ? keyedStateBackend.create(stateName, byteOrderedElementSerializer)
+                        : existingState;
+            }
+
+            @Nonnull
+            @Override
+            public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+                    long checkpointId,
+                    long timestamp,
+                    @Nonnull CheckpointStreamFactory streamFactory,
+                    @Nonnull CheckpointOptions checkpointOptions)
+                    throws Exception {
+                return keyedStateBackend.snapshot(
+                        checkpointId, timestamp, streamFactory, checkpointOptions);
+            }
+
+            @Override
+            public void dispose() {
+                super.dispose();
+                changelogStateFactory.dispose();

Review Comment:
   Why it is not `keyedStateBackend.dispose()`, by analogy with other methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
Myasuka commented on PR #19142:
URL: https://github.com/apache/flink/pull/19142#issuecomment-1107691046

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853017951


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {
+            throw new DynamicCodeLoadingException(
+                    "Cannot find DelegateStateBackend class: " + CHANGELOG_STATE_BACKEND, e);
+        }
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, boolean forSwitch)
+            throws DynamicCodeLoadingException {
 
         // ChangelogStateBackend resides in a separate module, load it using reflection
         try {
             Constructor<? extends DelegatingStateBackend> constructor =
                     Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
                             .asSubclass(DelegatingStateBackend.class)
-                            .getDeclaredConstructor(StateBackend.class);
+                            .getDeclaredConstructor(StateBackend.class, boolean.class);
             constructor.setAccessible(true);
-            return constructor.newInstance(backend);
+            return constructor.newInstance(backend, forSwitch);

Review Comment:
   Yeah, I think your suggestion makes sense.
   I just use 'ChangelogStateBackendForRecovery' to support recovery and switch.
   But I haven't gotten a better name about it. Do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r854497343


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -302,4 +354,14 @@ private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
             return 0L;
         }
     }
+
+    private Collection<ChangelogStateBackendHandle> reboundCheckpoint(
+            Collection<ChangelogStateBackendHandle> stateBackendHandles) {
+        return stateBackendHandles.stream()
+                .map(
+                        changelogStateBackendHandle ->
+                                changelogStateBackendHandle.rebound(
+                                        changelogStateBackendHandle.getCheckpointId()))

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855747431


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java:
##########
@@ -155,6 +155,25 @@ public AbstractKeyedStateBackend(
         this.keySelectionListeners = new ArrayList<>(1);
     }
 
+    // Copy constructor
+    public AbstractKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend) {

Review Comment:
   Can we introduce a `protected` contructor here, and let all contructors leverage same private base contructor?
   By doing so, we can then introduce another innner class within `ChangelogMigrationRestoreTarget` which also extends for `AbstractKeyedStateBackend`.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/DeactivatedChangelogStateBackend.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
+import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.BaseBackendBuilder;
+import org.apache.flink.state.changelog.restore.ChangelogMigrationRestoreTarget;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * This state backend use delegatedStateBackend and State changes to restore to the
+ * delegatedStateBackend which used in switching Changelog from enabled to disabled.

Review Comment:
   ```suggestion
    * delegatedStateBackend in which switching Changelog from enabled to disabled.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java:
##########
@@ -117,6 +117,8 @@ public abstract class MetadataV2V3SerializerBase {
     private static final byte INCREMENTAL_KEY_GROUPS_HANDLE_V2 = 11;
     // KEY_GROUPS_HANDLE_V2 is introduced to add new field of stateHandleId.
     private static final byte KEY_GROUPS_HANDLE_V2 = 12;
+    private static final byte CHANGELOG_HANDLE_V2 = 13;
+    private static final byte CHANGELOG_FILE_INCREMENT_HANDLE_V2 = 14;

Review Comment:
   Can we add descriptions in the code why we need to introduce these two new flags here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java:
##########
@@ -99,4 +101,24 @@ public static StateChangelogStorage<?> load(
             return factory.createStorage(configuration, metricGroup);
         }
     }
+
+    @Nonnull
+    public static StateChangelogStorageView<?> load(ChangelogStateHandle changelogStateHandle)

Review Comment:
   Can we use a different name to distinguish this method with above `load` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855863007


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java:
##########
@@ -155,6 +155,25 @@ public AbstractKeyedStateBackend(
         this.keySelectionListeners = new ArrayList<>(1);
     }
 
+    // Copy constructor
+    public AbstractKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend) {

Review Comment:
   I have modified as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855089509


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create consistent with
+        //  the method of getOrCreateKeyedState currently which could be removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+
+            @Override
+            public void setCurrentKey(K newKey) {
+                keyedStateBackend.setCurrentKey(newKey);
+            }
+
+            @Override
+            public void notifyCheckpointComplete(long checkpointId) throws Exception {
+                keyedStateBackend.notifyCheckpointComplete(checkpointId);
+            }
+
+            @Nonnull
+            @Override
+            public SavepointResources<K> savepoint() throws Exception {
+                return keyedStateBackend.savepoint();
+            }
+
+            @Override
+            public int numKeyValueStateEntries() {
+                return keyedStateBackend.numKeyValueStateEntries();
+            }
+
+            @Override
+            public <N> Stream<K> getKeys(String state, N namespace) {
+                return keyedStateBackend.getKeys(state, namespace);
+            }
+
+            @Override
+            public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
+                return keyedStateBackend.getKeysAndNamespaces(state);
+            }
+
+            @Nonnull
+            @Override
+            public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
+                    @Nonnull TypeSerializer<N> namespaceSerializer,
+                    @Nonnull StateDescriptor<S, SV> stateDesc,
+                    @Nonnull
+                            StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
+                                    snapshotTransformFactory)
+                    throws Exception {
+                return keyedStateBackend.createInternalState(
+                        namespaceSerializer, stateDesc, snapshotTransformFactory);
+            }
+
+            @Override
+            public <N, S extends State> S getPartitionedState(
+                    N namespace,
+                    TypeSerializer<N> namespaceSerializer,
+                    StateDescriptor<S, ?> stateDescriptor)
+                    throws Exception {
+                S partitionedState =
+                        keyedStateBackend.getPartitionedState(
+                                namespace, namespaceSerializer, stateDescriptor);
+                functionDelegationHelper.addOrUpdate(stateDescriptor);
+                return partitionedState;
+            }
+
+            @Override
+            public <N, S extends State, V> S getOrCreateKeyedState(
+                    TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+                    throws Exception {
+                S keyedState =
+                        keyedStateBackend.getOrCreateKeyedState(
+                                namespaceSerializer, stateDescriptor);
+                functionDelegationHelper.addOrUpdate(stateDescriptor);
+                return keyedState;
+            }
+
+            @Nonnull
+            @Override
+            @SuppressWarnings("unchecked")
+            public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+                    KeyGroupedInternalPriorityQueue<T> create(
+                            @Nonnull String stateName,
+                            @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+                ChangelogKeyGroupedPriorityQueue<T> existingState =
+                        (ChangelogKeyGroupedPriorityQueue<T>)
+                                changelogStateFactory.getExistingState(
+                                        stateName,
+                                        StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+                return existingState == null
+                        ? keyedStateBackend.create(stateName, byteOrderedElementSerializer)
+                        : existingState;
+            }
+
+            @Nonnull
+            @Override
+            public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+                    long checkpointId,
+                    long timestamp,
+                    @Nonnull CheckpointStreamFactory streamFactory,
+                    @Nonnull CheckpointOptions checkpointOptions)
+                    throws Exception {
+                return keyedStateBackend.snapshot(
+                        checkpointId, timestamp, streamFactory, checkpointOptions);
+            }
+
+            @Override
+            public void dispose() {
+                super.dispose();
+                changelogStateFactory.dispose();

Review Comment:
   You are right. Just modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r847968836


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java:
##########
@@ -391,7 +394,7 @@ static void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputSt
             dos.writeLong(handle.getStateSize());
             dos.writeLong(handle.getCheckpointedSize());
             writeStateHandleId(handle, dos);
-
+            dos.writeUTF(handle.getStorageIdentifier());

Review Comment:
   I am thinking a more common case about switching the change log storage.
   There are some problems in current implementation which use same change log storage to read and write.
   I think we should use original change log storage to read and use new one to write.
   It also make sense for switching from enabled to disabled.
   But we cannot get original change log storage currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853009395


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java:
##########
@@ -0,0 +1,33 @@
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/** Filesystem-based implementation of {@link StateChangelogStorage} just for recovery. */
+@Experimental
+@ThreadSafe
+public class FsStateChangelogStorageForRecovery
+        implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
+
+    @Override
+    public StateChangelogWriter<ChangelogStateHandleStreamImpl> createWriter(
+            String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
+        // The StateChangelogWriter will not be used when recovery
+        // under current design of Filesystem-based State Changelog Storage
+        throw new UnsupportedOperationException(
+                "createWriter is not supported for recovery from Changelog");
+    }
+
+    @Override
+    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader() {
+        return new StateChangelogHandleStreamHandleReader(new StateChangeFormat());
+    }
+}

Review Comment:
   Yeah, I agree. I just use StateChangelogStorageView to produce reader as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka merged pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
Myasuka merged PR #19142:
URL: https://github.com/apache/flink/pull/19142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r854505121


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {
+            throw new DynamicCodeLoadingException(
+                    "Cannot find DelegateStateBackend class: " + CHANGELOG_STATE_BACKEND, e);
+        }
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, boolean forSwitch)
+            throws DynamicCodeLoadingException {
 
         // ChangelogStateBackend resides in a separate module, load it using reflection
         try {
             Constructor<? extends DelegatingStateBackend> constructor =
                     Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
                             .asSubclass(DelegatingStateBackend.class)
-                            .getDeclaredConstructor(StateBackend.class);
+                            .getDeclaredConstructor(StateBackend.class, boolean.class);
             constructor.setAccessible(true);
-            return constructor.newInstance(backend);
+            return constructor.newInstance(backend, forSwitch);

Review Comment:
   The difficult part :) 
   How about `DeactivatedChangelogStateBackend`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853013989


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {

Review Comment:
   Agree. We may just need to compare string of their name.
   For the second problem, I think it needs more judgement in `loadStateBackendFromKeyedStateHandles` but it may deserve more discussion, and I just added some comments in the method currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on PR #19142:
URL: https://github.com/apache/flink/pull/19142#issuecomment-1091815704

   > Thanks for the PR @masteryhx
   > 
   > I have some concerns about approach taken, i.e. creating and destroying a full-fledged `ChangelogKeyedStateBackend`: (PCIIW)
   > 
   > 1. `StateChangelogStorageFactory` might not be available after recovery (it is necessary to create changelog writer)
   > 2. Some parts of the Changelog backend initialized during the recovery will be lost once it's closed, for example `keyValueStatesByName`. As a result, the nested backend will re-create its state objects.
   > 3. Delegating functions will not be updating, IIUC (see `ChangelogKeyedStateBackend.functionDelegationHelper`)
   > 
   > While running the tests, I see that after recovery no checkpoint (and likely processing) is performed; tasks immediately switch from `RUNNING` to `FINISHED`.
   > 
   > An alternative approach would be to extract code responsible for applying changes and run it directly, without creating `ChangelogKeyedStateBackend`. Most of the code is already extracted in the form of `ChangelogBackendRestoreOperation` and ChangeAppliers. That seems more flexible and less fragile. WDYT?
   
   Thanks for the suggestion!
   I think the 1st problem also exists in your solution.
   For the 2nd problem, you are right and I have not found a great way to resolve it. So I just adopt your solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853020326


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -302,4 +354,14 @@ private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
             return 0L;
         }
     }
+
+    private Collection<ChangelogStateBackendHandle> reboundCheckpoint(
+            Collection<ChangelogStateBackendHandle> stateBackendHandles) {
+        return stateBackendHandles.stream()
+                .map(
+                        changelogStateBackendHandle ->
+                                changelogStateBackendHandle.rebound(
+                                        changelogStateBackendHandle.getCheckpointId()))

Review Comment:
   Agree. I have added detailed comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853024865


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -75,27 +77,37 @@
 
     private final StateBackend delegatedStateBackend;
 
+    private final boolean forSwitch;

Review Comment:
   I just split it into two classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r854496285


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreTarget.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.state.changelog.ChangelogState;
+
+import javax.annotation.Nonnull;
+
+/** Maintains metadata operation related to Changelog recovery. */
+public interface ChangelogRestoreTarget<K> {
+
+    KeyGroupRange getKeyGroupRange();
+
+    <N, S extends State, V> S getOrCreateKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception;
+
+    @Nonnull
+    <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> create(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
+
+    ChangelogState getExistingState(String name, BackendStateType type);
+
+    CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend();

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855863884


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java:
##########
@@ -99,4 +101,24 @@ public static StateChangelogStorage<?> load(
             return factory.createStorage(configuration, metricGroup);
         }
     }
+
+    @Nonnull
+    public static StateChangelogStorageView<?> load(ChangelogStateHandle changelogStateHandle)

Review Comment:
   replaced `load` with `loadFromStateHandle`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853015290


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {

Review Comment:
   Agree. I have swapped them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855087214


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.Test;
+
+/**
+ * This verifies that switching state backend works correctly for Changelog state backend with
+ * materialized state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationSwitchStateBackendITCase

Review Comment:
   > Could you explain what do you mean, why pq states don't support migration?
   I mean "state migration".
   beacuse we do not support state migration currently. So while switching from enabled to disabled, the 1st user access will fail in original implementation. You could see TODO in the ChangelogMigrationRestoreTarget#getRestoredKeyedStateBackend.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.Test;
+
+/**
+ * This verifies that switching state backend works correctly for Changelog state backend with
+ * materialized state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationSwitchStateBackendITCase

Review Comment:
   > Could you explain what do you mean, why pq states don't support migration?
   
   I mean "state migration".
   beacuse we do not support state migration currently. So while switching from enabled to disabled, the 1st user access will fail in original implementation. You could see TODO in the ChangelogMigrationRestoreTarget#getRestoredKeyedStateBackend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r847211776


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)

Review Comment:
   I'd explicitly state in the comment that wrapping a changelog state handle in some other handle is not supported.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {

Review Comment:
   This check is much faster than the 1st one (if reflection removed), should we swap them?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreTarget.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.state.changelog.ChangelogState;
+
+import javax.annotation.Nonnull;
+
+/** Maintains metadata operation related to Changelog recovery. */
+public interface ChangelogRestoreTarget<K> {

Review Comment:
   `@Internal`?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/DelegatedRestoreOperation.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+/** A {@link ChangelogRestoreTarget} supports to restore to the delegated keyed state backend. */
+public class DelegatedRestoreOperation<K> implements ChangelogRestoreTarget<K> {

Review Comment:
   The name isn't very informative IMO.
   I think the name should reflect the purpose, i.e. migration from changelog to non-changelog case.
   So maybe something like `ChangelogMigrationRestoreTarget`?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreOperation.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+/** A {@link ChangelogRestoreTarget} supports to restore to {@link ChangelogKeyedStateBackend} */
+public class ChangelogRestoreOperation<K> implements ChangelogRestoreTarget<K> {

Review Comment:
   The name collides (mentally) with the existing `ChangelogBackendRestoreOperation`, it's easy to confuse them.
   Furthermore, can't we simply implement `ChangelogRestoreTarget` by `ChangelogKeyedStateBackend` and remove this class?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java:
##########
@@ -0,0 +1,33 @@
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/** Filesystem-based implementation of {@link StateChangelogStorage} just for recovery. */
+@Experimental
+@ThreadSafe
+public class FsStateChangelogStorageForRecovery
+        implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
+
+    @Override
+    public StateChangelogWriter<ChangelogStateHandleStreamImpl> createWriter(
+            String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
+        // The StateChangelogWriter will not be used when recovery
+        // under current design of Filesystem-based State Changelog Storage
+        throw new UnsupportedOperationException(
+                "createWriter is not supported for recovery from Changelog");
+    }
+
+    @Override
+    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader() {
+        return new StateChangelogHandleStreamHandleReader(new StateChangeFormat());
+    }
+}

Review Comment:
   The current PR hierarchy looks like this: `StateChangelogStorage -> FsStateChangelogStorageForRecovery -> FsStateChangelogStorageForRecovery`
   
   How about splitting the top-level `StateChangelogStorage` into reader and writer and implementing them separately, so that:
   1. We don't need to implement a method by throwing `UnsupportedOperationException`
   2. We can return read-only interface from `StateChangelogStorageLoader.load`
   3. It's easier to implement the needed part for tests
   
   The new interface could be named as read-only, view, or something like that.
   The existing interface can be completely separate write-only version, or it could extend the read-only counter-part. I think the latter is preferrable.
   
   WDYT?
   



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -302,4 +354,14 @@ private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
             return 0L;
         }
     }
+
+    private Collection<ChangelogStateBackendHandle> reboundCheckpoint(
+            Collection<ChangelogStateBackendHandle> stateBackendHandles) {
+        return stateBackendHandles.stream()
+                .map(
+                        changelogStateBackendHandle ->
+                                changelogStateBackendHandle.rebound(
+                                        changelogStateBackendHandle.getCheckpointId()))

Review Comment:
   I think the reason for this call is not obvious but is important, so it should be clarified here in the comment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {

Review Comment:
   1. Do we really need to instantiate the class here, can't we just check the class name? Or is it because of some classloading issues?
   2. What if the Changelog backend is wrapped by some other backend? Should we check `getDelegatedStateBackend` recursively?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -75,27 +77,37 @@
 
     private final StateBackend delegatedStateBackend;
 
+    private final boolean forSwitch;

Review Comment:
   I agree, the name is not ver informative. 
   But as I suggested above, maybe it's better to just split this class and get rid of the flag alltogether.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.Test;
+
+/**
+ * This verifies that switching state backend works correctly for Changelog state backend with
+ * materialized state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationSwitchStateBackendITCase

Review Comment:
   I think the test doesn't cover reading the changelog for PQ states currently, right?
   I.e. there are no timers/windows in the graph used.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Maintains the lifecycle of all {@link ChangelogState}s. */
+public class ChangelogStateFactory {
+
+    /**
+     * Unwrapped changelog states used for recovery (not wrapped into e.g. TTL, latency tracking).
+     */
+    private final Map<String, ChangelogState> changelogStates;
+
+    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName;
+
+    public ChangelogStateFactory() {
+        this.changelogStates = new HashMap<>();
+        this.priorityQueueStatesByName = new HashMap<>();
+    }
+
+    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    StateDescriptor.Type.VALUE,
+                                    (StateFactory) ChangelogValueState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.LIST,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.REDUCING,
+                                    (StateFactory) ChangelogReducingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.AGGREGATING,
+                                    (StateFactory) ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.MAP,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        return create(stateDescriptor, internalKvState, new VoidStateChangeLogger<>(), keyContext);
+    }
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            KvStateChangeLogger<V, N> kvStateChangeLogger,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        ChangelogState changelogState =
+                getStateFactory(stateDescriptor)
+                        .create(internalKvState, kvStateChangeLogger, keyContext);
+        changelogStates.put(stateDescriptor.getName(), changelogState);
+        return changelogState;
+    }
+
+    public <T> ChangelogKeyGroupedPriorityQueue<T> create(
+            String stateName,
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue,
+            TypeSerializer<T> serializer) {
+        return create(stateName, internalPriorityQueue, new VoidStateChangeLogger<>(), serializer);

Review Comment:
   NIT: `VoidStateChangeLogger` can be a singleton



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {
+            throw new DynamicCodeLoadingException(
+                    "Cannot find DelegateStateBackend class: " + CHANGELOG_STATE_BACKEND, e);
+        }
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, boolean forSwitch)
+            throws DynamicCodeLoadingException {
 
         // ChangelogStateBackend resides in a separate module, load it using reflection
         try {
             Constructor<? extends DelegatingStateBackend> constructor =
                     Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
                             .asSubclass(DelegatingStateBackend.class)
-                            .getDeclaredConstructor(StateBackend.class);
+                            .getDeclaredConstructor(StateBackend.class, boolean.class);
             constructor.setAccessible(true);
-            return constructor.newInstance(backend);
+            return constructor.newInstance(backend, forSwitch);

Review Comment:
   I think it's better to introduce a new class and instantiate it if `forSwitch == true`, so that:
   1. The already fragile contract of constructor, accepting a nested arguement is not extended
   2. The two classes have more clear responsibilities (read-only / normal restore) and there is no conditional logic inside `ChangelogStateBackend`; and thus, less complexity



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreTarget.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.state.changelog.ChangelogState;
+
+import javax.annotation.Nonnull;
+
+/** Maintains metadata operation related to Changelog recovery. */
+public interface ChangelogRestoreTarget<K> {
+
+    KeyGroupRange getKeyGroupRange();
+
+    <N, S extends State, V> S getOrCreateKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception;
+
+    @Nonnull
+    <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> create(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
+
+    ChangelogState getExistingState(String name, BackendStateType type);
+
+    CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend();

Review Comment:
   1. I think these methods deserve a couple of words of javadoc
   2. `getOrCreateKeyedState` and `create` names are not ver clear. I undertand they come from the corresponding existing interfaces, so maybe mention those methods explicitly via javadoc link; and/or rename them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r847160050


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -75,27 +77,37 @@
 
     private final StateBackend delegatedStateBackend;
 
+    private final boolean forSwitch;

Review Comment:
   How about renamed to `restoreAndSwitchOnly`?
   
   Naming is a hard thing.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java:
##########
@@ -391,7 +394,7 @@ static void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputSt
             dos.writeLong(handle.getStateSize());
             dos.writeLong(handle.getCheckpointedSize());
             writeStateHandleId(handle, dos);
-
+            dos.writeUTF(handle.getStorageIdentifier());

Review Comment:
   Why we must introduce a storage identifier here to initialize the change log storage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853019852


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreTarget.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.state.changelog.ChangelogState;
+
+import javax.annotation.Nonnull;
+
+/** Maintains metadata operation related to Changelog recovery. */
+public interface ChangelogRestoreTarget<K> {
+
+    KeyGroupRange getKeyGroupRange();
+
+    <N, S extends State, V> S getOrCreateKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception;
+
+    @Nonnull
+    <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> create(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
+
+    ChangelogState getExistingState(String name, BackendStateType type);
+
+    CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend();

Review Comment:
   Yeah, I have added comments for all methods in the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853023806


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.Test;
+
+/**
+ * This verifies that switching state backend works correctly for Changelog state backend with
+ * materialized state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationSwitchStateBackendITCase

Review Comment:
   Yeah, I have added a timer in the test base.
   I have modified the `ChangelogMigrationRestoreTarget#getRestoredKeyedStateBackend` because pq states do not supports migration currently.
   BTW, I also enable switching case in `ChangelogCompatibilityITCase` which uses windows in the graph.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855089980


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create consistent with
+        //  the method of getOrCreateKeyedState currently which could be removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+

Review Comment:
   I think it's  a good suggestion. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855088425


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {
+            throw new DynamicCodeLoadingException(
+                    "Cannot find DelegateStateBackend class: " + CHANGELOG_STATE_BACKEND, e);
+        }
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, boolean forSwitch)
+            throws DynamicCodeLoadingException {
 
         // ChangelogStateBackend resides in a separate module, load it using reflection
         try {
             Constructor<? extends DelegatingStateBackend> constructor =
                     Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
                             .asSubclass(DelegatingStateBackend.class)
-                            .getDeclaredConstructor(StateBackend.class);
+                            .getDeclaredConstructor(StateBackend.class, boolean.class);
             constructor.setAccessible(true);
-            return constructor.newInstance(backend);
+            return constructor.newInstance(backend, forSwitch);

Review Comment:
   I have not found a better name. Just use this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r853018771


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreOperation.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+/** A {@link ChangelogRestoreTarget} supports to restore to {@link ChangelogKeyedStateBackend} */
+public class ChangelogRestoreOperation<K> implements ChangelogRestoreTarget<K> {

Review Comment:
   Agree. Moved it into `ChangelogKeyedStateBackend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx commented on pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx commented on PR #19142:
URL: https://github.com/apache/flink/pull/19142#issuecomment-1103559768

   > I think current PR is much clear than before, I had not give a deep review. Since this PR is quite large, can you split this PR into several commints:
   > 
   > * Modify `ChangelogStateBackend` to support restoring and switch only.
   > * Introduce `ChangelogRestoreTarget` and its implementations.
   > * Introduce `CheckpointBoundKeyedStateHandle#rebound`.
   
   I just try to split it into four commits for convinent to review. 
   I will squash them when approved.
   cc @rkhachatryan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] masteryhx closed pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
masteryhx closed pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…
URL: https://github.com/apache/flink/pull/19142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r854515976


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.Test;
+
+/**
+ * This verifies that switching state backend works correctly for Changelog state backend with
+ * materialized state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationSwitchStateBackendITCase

Review Comment:
   > I have added a timer in the test base.
   
   I think the timers are saved but the correctness of restored timers is not validated, is it?
   With "normal" window operator, incorrectly restored timer might lead to window not firing or just an exception.
   
   > I have modified the ChangelogMigrationRestoreTarget#getRestoredKeyedStateBackend because pq states do not supports migration currently.
   
   Could you explain what do you mean, why pq states don't support migration?
   
   
   > BTW, I also enable switching case in ChangelogCompatibilityITCase which uses windows in the graph.
   
   Good idea! :+1: 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r855441945


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create consistent with
+        //  the method of getOrCreateKeyedState currently which could be removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org