You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 11:43:53 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #19142: [FLINK-23252][changelog] Support recovery from checkpoint after disab…

rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r847211776


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)

Review Comment:
   I'd explicitly state in the comment that wrapping a changelog state handle in some other handle is not supported.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {

Review Comment:
   This check is much faster than the 1st one (if reflection removed), should we swap them?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreTarget.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.state.changelog.ChangelogState;
+
+import javax.annotation.Nonnull;
+
+/** Maintains metadata operation related to Changelog recovery. */
+public interface ChangelogRestoreTarget<K> {

Review Comment:
   `@Internal`?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/DelegatedRestoreOperation.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+/** A {@link ChangelogRestoreTarget} supports to restore to the delegated keyed state backend. */
+public class DelegatedRestoreOperation<K> implements ChangelogRestoreTarget<K> {

Review Comment:
   The name isn't very informative IMO.
   I think the name should reflect the purpose, i.e. migration from changelog to non-changelog case.
   So maybe something like `ChangelogMigrationRestoreTarget`?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreOperation.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+/** A {@link ChangelogRestoreTarget} supports to restore to {@link ChangelogKeyedStateBackend} */
+public class ChangelogRestoreOperation<K> implements ChangelogRestoreTarget<K> {

Review Comment:
   The name collides (mentally) with the existing `ChangelogBackendRestoreOperation`, it's easy to confuse them.
   Furthermore, can't we simply implement `ChangelogRestoreTarget` by `ChangelogKeyedStateBackend` and remove this class?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageForRecovery.java:
##########
@@ -0,0 +1,33 @@
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/** Filesystem-based implementation of {@link StateChangelogStorage} just for recovery. */
+@Experimental
+@ThreadSafe
+public class FsStateChangelogStorageForRecovery
+        implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
+
+    @Override
+    public StateChangelogWriter<ChangelogStateHandleStreamImpl> createWriter(
+            String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
+        // The StateChangelogWriter will not be used when recovery
+        // under current design of Filesystem-based State Changelog Storage
+        throw new UnsupportedOperationException(
+                "createWriter is not supported for recovery from Changelog");
+    }
+
+    @Override
+    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader() {
+        return new StateChangelogHandleStreamHandleReader(new StateChangeFormat());
+    }
+}

Review Comment:
   The current PR hierarchy looks like this: `StateChangelogStorage -> FsStateChangelogStorageForRecovery -> FsStateChangelogStorageForRecovery`
   
   How about splitting the top-level `StateChangelogStorage` into reader and writer and implementing them separately, so that:
   1. We don't need to implement a method by throwing `UnsupportedOperationException`
   2. We can return read-only interface from `StateChangelogStorageLoader.load`
   3. It's easier to implement the needed part for tests
   
   The new interface could be named as read-only, view, or something like that.
   The existing interface can be completely separate write-only version, or it could extend the read-only counter-part. I think the latter is preferrable.
   
   WDYT?
   



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -302,4 +354,14 @@ private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
             return 0L;
         }
     }
+
+    private Collection<ChangelogStateBackendHandle> reboundCheckpoint(
+            Collection<ChangelogStateBackendHandle> stateBackendHandles) {
+        return stateBackendHandles.stream()
+                .map(
+                        changelogStateBackendHandle ->
+                                changelogStateBackendHandle.rebound(
+                                        changelogStateBackendHandle.getCheckpointId()))

Review Comment:
   I think the reason for this call is not obvious but is important, so it should be clarified here in the comment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {

Review Comment:
   1. Do we really need to instantiate the class here, can't we just check the class name? Or is it because of some classloading issues?
   2. What if the Changelog backend is wrapped by some other backend? Should we check `getDelegatedStateBackend` recursively?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -75,27 +77,37 @@
 
     private final StateBackend delegatedStateBackend;
 
+    private final boolean forSwitch;

Review Comment:
   I agree, the name is not ver informative. 
   But as I suggested above, maybe it's better to just split this class and get rid of the flag alltogether.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.Test;
+
+/**
+ * This verifies that switching state backend works correctly for Changelog state backend with
+ * materialized state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationSwitchStateBackendITCase

Review Comment:
   I think the test doesn't cover reading the changelog for PQ states currently, right?
   I.e. there are no timers/windows in the graph used.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Maintains the lifecycle of all {@link ChangelogState}s. */
+public class ChangelogStateFactory {
+
+    /**
+     * Unwrapped changelog states used for recovery (not wrapped into e.g. TTL, latency tracking).
+     */
+    private final Map<String, ChangelogState> changelogStates;
+
+    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName;
+
+    public ChangelogStateFactory() {
+        this.changelogStates = new HashMap<>();
+        this.priorityQueueStatesByName = new HashMap<>();
+    }
+
+    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    StateDescriptor.Type.VALUE,
+                                    (StateFactory) ChangelogValueState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.LIST,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.REDUCING,
+                                    (StateFactory) ChangelogReducingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.AGGREGATING,
+                                    (StateFactory) ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.MAP,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        return create(stateDescriptor, internalKvState, new VoidStateChangeLogger<>(), keyContext);
+    }
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            KvStateChangeLogger<V, N> kvStateChangeLogger,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        ChangelogState changelogState =
+                getStateFactory(stateDescriptor)
+                        .create(internalKvState, kvStateChangeLogger, keyContext);
+        changelogStates.put(stateDescriptor.getName(), changelogState);
+        return changelogState;
+    }
+
+    public <T> ChangelogKeyGroupedPriorityQueue<T> create(
+            String stateName,
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue,
+            TypeSerializer<T> serializer) {
+        return create(stateName, internalPriorityQueue, new VoidStateChangeLogger<>(), serializer);

Review Comment:
   NIT: `VoidStateChangeLogger` can be a singleton



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,17 +360,51 @@ public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        if (keyedStateHandles.stream()
+                        .anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)
+                && !isChangelogStateBackend(originalStateBackend)) {
+            return loadChangelogStateBackend(originalStateBackend, classLoader, true);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend)
+            throws DynamicCodeLoadingException {
+        try {
+            return Class.forName(CHANGELOG_STATE_BACKEND) == backend.getClass();
+        } catch (ClassNotFoundException e) {
+            throw new DynamicCodeLoadingException(
+                    "Cannot find DelegateStateBackend class: " + CHANGELOG_STATE_BACKEND, e);
+        }
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, boolean forSwitch)
+            throws DynamicCodeLoadingException {
 
         // ChangelogStateBackend resides in a separate module, load it using reflection
         try {
             Constructor<? extends DelegatingStateBackend> constructor =
                     Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
                             .asSubclass(DelegatingStateBackend.class)
-                            .getDeclaredConstructor(StateBackend.class);
+                            .getDeclaredConstructor(StateBackend.class, boolean.class);
             constructor.setAccessible(true);
-            return constructor.newInstance(backend);
+            return constructor.newInstance(backend, forSwitch);

Review Comment:
   I think it's better to introduce a new class and instantiate it if `forSwitch == true`, so that:
   1. The already fragile contract of constructor, accepting a nested arguement is not extended
   2. The two classes have more clear responsibilities (read-only / normal restore) and there is no conditional logic inside `ChangelogStateBackend`; and thus, less complexity



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogRestoreTarget.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import org.apache.flink.state.changelog.ChangelogState;
+
+import javax.annotation.Nonnull;
+
+/** Maintains metadata operation related to Changelog recovery. */
+public interface ChangelogRestoreTarget<K> {
+
+    KeyGroupRange getKeyGroupRange();
+
+    <N, S extends State, V> S getOrCreateKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
+            throws Exception;
+
+    @Nonnull
+    <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> create(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
+
+    ChangelogState getExistingState(String name, BackendStateType type);
+
+    CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend();

Review Comment:
   1. I think these methods deserve a couple of words of javadoc
   2. `getOrCreateKeyedState` and `create` names are not ver clear. I undertand they come from the corresponding existing interfaces, so maybe mention those methods explicitly via javadoc link; and/or rename them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org