You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/29 03:19:26 UTC

[GitHub] [flink] curcur opened a new pull request #14799: Wrapped StateBackend to forward state changes to StateChangleLog

curcur opened a new pull request #14799:
URL: https://github.com/apache/flink/pull/14799


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831",
       "triggerID" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 89066112157569ad58908e401696b45a56f48fc3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693) 
   * bc4cd266c548473fdd31bad5018ae210048d0472 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r576016613



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
##########
@@ -31,7 +31,7 @@
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
-class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>
+public class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>

Review comment:
       goog catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567014439



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public ProxyKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend) {
+        super(
+                keyedStateBackend.kvStateRegistry,
+                keyedStateBackend.keySerializer,
+                keyedStateBackend.userCodeClassLoader,
+                keyedStateBackend.executionConfig,
+                keyedStateBackend.ttlTimeProvider,
+                keyedStateBackend.cancelStreamRegistry,
+                keyedStateBackend.keyGroupCompressionDecorator,
+                keyedStateBackend.keyContext);
+        this.keyedStateBackend = keyedStateBackend;

Review comment:
       In the end, we end up with 4 backends:
   ```
   Proxy extends Abstract (has it's own fields AND refers to passed fields)
       Proxied extends Abstract (has it's own fields AND refers to passed fields)
   ```
   Right?
   
   I see the following issues with this:
   1. Double caching and lookup in `keyValueStatesByName`
   2. Error-prone: 4 backend classes refer to the same `kvStateRegistry`, `keySerializer` etc. making it easy to either call some method twice or not call it for a wrapped backend (e.g. `setCurrentKey` isn't called for the wrapped backend)
   3. To me this hierarchy is very confusing
   
   ~Why not refactor this in some way or another? Can `AbstractKeyedStateBackend` be also a proxy (i.e. replace inheritance with delegation)?~
   
   edit:
   I see `keyValueStatesByName` is duplicated but one instance isn't actually used. This moves the weight from (1) to (3) :) 
   
   Why `ProxyKeyedStateBackend` can't implement `CheckpointableKeyedStateBackend` directly?
   All it has to override is `getPartitionedState`, everything else should be delegated?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r578323324



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -162,14 +163,19 @@ public static CheckpointStorage load(
         Preconditions.checkNotNull(classLoader, "classLoader");
         Preconditions.checkNotNull(configuredStateBackend, "statebackend");
 
-        if (configuredStateBackend instanceof CheckpointStorage) {
+        StateBackend rootStateBackend =
+                (configuredStateBackend instanceof ProxyStateBackend)
+                        ? ((ProxyStateBackend) configuredStateBackend).getProxiedStateBackend()
+                        : configuredStateBackend;

Review comment:
       This is the only place needing to unwrap for "state backend"; and it is because some of the backends are both state backend + checkpoint storage (for now).
   
   There are some other places to check whether it is XXXKeyedStateBackend might be cofusing with this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831",
       "triggerID" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12911",
       "triggerID" : "0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12911) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 89066112157569ad58908e401696b45a56f48fc3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693) 
   * bc4cd266c548473fdd31bad5018ae210048d0472 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r570555194



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ProxyKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>, CheckpointListener {
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    // ==============================================================
+    //  cache maintained by the proxyKeyedStateBackend itself
+    //  not the same as the underlying wrapped keyedStateBackend
+    //  InternalKvState is a ProxyXXState, XX stands for Value, List ...
+    /** So that we can give out state when the user uses the same key. */
+    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+
+    @SuppressWarnings("rawtypes")
+    protected InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    protected String lastName;
+
+    // ==============================================================
+    // ==== the same as the wrapped keyedStateBackend
+
+    public final ExecutionConfig executionConfig;
+
+    public final TtlTimeProvider ttlTimeProvider;
+
+    public ProxyKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) {
+
+            final S state = getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {

Review comment:
       Shouldn't the name be `notSupportConcurrentModification` (reversed)?
   If true then we iterate without copying to list.
   
   Just out of curiosity: did you actually get an exception here with HeapStateBackend? Which test?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -204,6 +216,11 @@ public static StateBackend fromApplicationOrConfigOrDefault(
 
         // (1) the application defined state backend has precedence
         if (fromApplication != null) {
+
+            checkArgument(
+                    !(fromApplication instanceof ProxyStateBackend),

Review comment:
       This will not detect `new StubBackend(new ProxyBackend))` :)
   I suggested above to add `unwrap` method which could also help here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -242,6 +259,34 @@ public static StateBackend fromApplicationOrConfigOrDefault(
         return backend;
     }
 
+    public static StateBackend loadStateBackend(
+            @Nullable StateBackend fromApplication,
+            Configuration config,
+            ClassLoader classLoader,
+            @Nullable Logger logger)
+            throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+        final StateBackend backend =
+                fromApplicationOrConfigOrDefault(fromApplication, config, classLoader, logger);
+
+        if (needToProxyStateBackend(backend, config)) {
+            LOG.info(
+                    "Proxy State Backend used, and the root State Backend is {}",
+                    backend.getClass().getSimpleName());
+            return new ProxyStateBackend(backend);

Review comment:
       I think this class is the right place to decide and create `ProxyStateBackend` :+1: 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
##########
@@ -99,23 +101,30 @@ private InternalTimeServiceManagerImpl(
      * <p><b>IMPORTANT:</b> Keep in sync with {@link InternalTimeServiceManager.Provider}.
      */
     public static <K> InternalTimeServiceManagerImpl<K> create(
-            CheckpointableKeyedStateBackend<K> keyedStatedBackend,
+            CheckpointableKeyedStateBackend<K> keyedStateBackend,
             ClassLoader userClassloader,
             KeyContext keyContext,
             ProcessingTimeService processingTimeService,
             Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates)
             throws Exception {
-        final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange();
+        final KeyGroupRange keyGroupRange = keyedStateBackend.getKeyGroupRange();
+
+        KeyedStateBackend<?> rootKeyedStateBackend =
+                keyedStateBackend instanceof ProxyKeyedStateBackend
+                        ? ((ProxyKeyedStateBackend<?>) keyedStateBackend)
+                                .getProxiedKeyedStateBackend()
+                        : keyedStateBackend;
+
         final boolean requiresSnapshotLegacyTimers =
-                keyedStatedBackend instanceof AbstractKeyedStateBackend
-                        && ((AbstractKeyedStateBackend<K>) keyedStatedBackend)
+                rootKeyedStateBackend instanceof AbstractKeyedStateBackend
+                        && ((AbstractKeyedStateBackend<K>) rootKeyedStateBackend)

Review comment:
       WDYT about pulling `requiresLegacySynchronousTimerSnapshots` to the interface?
   Then, with `unwrap` added we could just call
   `rootKeyedStateBackend.unwrap().requiresLegacySynchronousTimerSnapshots()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -162,14 +163,19 @@ public static CheckpointStorage load(
         Preconditions.checkNotNull(classLoader, "classLoader");
         Preconditions.checkNotNull(configuredStateBackend, "statebackend");
 
-        if (configuredStateBackend instanceof CheckpointStorage) {
+        StateBackend rootStateBackend =
+                (configuredStateBackend instanceof ProxyStateBackend)
+                        ? ((ProxyStateBackend) configuredStateBackend).getProxiedStateBackend()
+                        : configuredStateBackend;

Review comment:
       I see this fragment several times in your PR. If later we add one more layer (say logging or new TTL implementation) this can break (we already have `StubStateBackend`).
   
   WDYT about adding `default StateBackend unwrap()` method to the interface?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       Why do we want to limit proxying to these classes?
   Besides that, hardcoding class names like this is a bit fragile (`RocksDBStateBackend.class`?)
   
   At least one upcoming PR already adds new classes that should be proxied.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
##########
@@ -31,7 +31,7 @@
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
-class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>
+public class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>

Review comment:
       Why is this change needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567978393



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       1. Yes, because of unnecessary complexity (plus `instanceof` is more flexible than keys of `class` type).
   2. Can you explain why is it more flexible and extensible than just `if-else`?. And why the same level of flexibility and extensibility is needed in wrapping and wrapped backends?
   3. I'd prefer such a refactoring (if any) to be performed before adding a new backend




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567704413



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       I think different backends may have completely different implementations. I can imagine some backends **do** register state factories dynamically (and therefore need this complexity).
   So I don't think it's necessary to for them to share the internals.
   
   But if we **do** want this code to be shared then a better way would be to extract and re-use.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567556094



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public ProxyKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend) {
+        super(
+                keyedStateBackend.kvStateRegistry,
+                keyedStateBackend.keySerializer,
+                keyedStateBackend.userCodeClassLoader,
+                keyedStateBackend.executionConfig,
+                keyedStateBackend.ttlTimeProvider,
+                keyedStateBackend.cancelStreamRegistry,
+                keyedStateBackend.keyGroupCompressionDecorator,
+                keyedStateBackend.keyContext);
+        this.keyedStateBackend = keyedStateBackend;

Review comment:
       All these questions are great! For all the reasons mentioned or not mentioned above, I feel this hierarchy is confusing as well. So I've spent some time during the weekend simplifying it. 
   
   Originally, the reason I extend `AbstractKeyedStateBackend` is I thought all fields have to be wrapped and delegated. So if I implement `CheckpointableKeyedStateBackend`, I basically rewrite everything in `AbstractKeyedStateBackend`. 
   
   But it is not true, only the InternalKvState related (that's the state representation to internal ops) have to be wrapped, everything else (keys, namespace e.t.c) is kept untouched.
   
   So, I reimplement this to  `implements CheckpointableKeyedStateBackend`. With a bit more duplicated code, I think this is much more simple and clean.
   
   **However**, the problem `keyValueStatesByName` is duplicated but one instance that isn't actually used is still there. I am still not happy with this part, but much better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831",
       "triggerID" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12911",
       "triggerID" : "0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc4cd266c548473fdd31bad5018ae210048d0472 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831) 
   * 0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12911) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575780943



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       The reason is because the RocksDBStateBackend is in a different module (dependency problem), you can not use "RocksDBStateBackend.class" directly.
   
   the loader is in runtime
   rocksdb module depends on runtime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cf913a7fa26e306a78febf0ad7b5fb9e8d48288d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647) 
   * 7ee60d64feb3da9e9fc24e08482260d4b021c65f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575781718



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ProxyKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>, CheckpointListener {
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    // ==============================================================
+    //  cache maintained by the proxyKeyedStateBackend itself
+    //  not the same as the underlying wrapped keyedStateBackend
+    //  InternalKvState is a ProxyXXState, XX stands for Value, List ...
+    /** So that we can give out state when the user uses the same key. */
+    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+
+    @SuppressWarnings("rawtypes")
+    protected InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    protected String lastName;
+
+    // ==============================================================
+    // ==== the same as the wrapped keyedStateBackend
+
+    public final ExecutionConfig executionConfig;
+
+    public final TtlTimeProvider ttlTimeProvider;
+
+    public ProxyKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) {
+
+            final S state = getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {

Review comment:
       If it supports concurrent modification, you do not need to get the full list of keys in front (iterator can not be modified); that's what `else` logic does here.
   
   In this: [FLINK-9060][state] Fix concurrent modification exception when iterating keys, there are more details on what exactly "ConcurrentModification" refers to. It refers to concurrent modification on key-value entries of the state; StateTable is a HashTable (something similar, do not remember details), which does not support concurrent modification.
   
   test-case for this: Yes, 
   `StateBackendTestBase#testConcurrentModificationWithApplyToAllKeys`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575781157



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       You can check how rocks db is loaded, it is using the class name as well, and I guess for the same reason.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575781157



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       You can check how rocks db is loaded, it is using the class name as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r578323771



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
##########
@@ -99,23 +101,30 @@ private InternalTimeServiceManagerImpl(
      * <p><b>IMPORTANT:</b> Keep in sync with {@link InternalTimeServiceManager.Provider}.
      */
     public static <K> InternalTimeServiceManagerImpl<K> create(
-            CheckpointableKeyedStateBackend<K> keyedStatedBackend,
+            CheckpointableKeyedStateBackend<K> keyedStateBackend,
             ClassLoader userClassloader,
             KeyContext keyContext,
             ProcessingTimeService processingTimeService,
             Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates)
             throws Exception {
-        final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange();
+        final KeyGroupRange keyGroupRange = keyedStateBackend.getKeyGroupRange();
+
+        KeyedStateBackend<?> rootKeyedStateBackend =
+                keyedStateBackend instanceof ProxyKeyedStateBackend
+                        ? ((ProxyKeyedStateBackend<?>) keyedStateBackend)
+                                .getProxiedKeyedStateBackend()
+                        : keyedStateBackend;
+
         final boolean requiresSnapshotLegacyTimers =
-                keyedStatedBackend instanceof AbstractKeyedStateBackend
-                        && ((AbstractKeyedStateBackend<K>) keyedStatedBackend)
+                rootKeyedStateBackend instanceof AbstractKeyedStateBackend
+                        && ((AbstractKeyedStateBackend<K>) rootKeyedStateBackend)

Review comment:
       Yes, that's a good idea




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567861579



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       1. I do not think it is overly complicated, it is using a mapper to store the create factory instead of the "if else statement"... To me, they are just different ways of implementation. Is the reason specifically you do not like this way because it is complicated?
   
   2. I am a bit prefer this way because I want the wrapper to provide as least the same level of flexibility and extensiblity) as the wrapped one.
   
   3. As I said, if you think it is not necessary, we can change to the non-dynamic way, but with all the wrapped version changed as well (as long as to keep them the same level of extensibility).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633) 
   * a6e2884666b069a67c935809aff84a2fc15d20af UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831",
       "triggerID" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc4cd266c548473fdd31bad5018ae210048d0472 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769547636


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 (Fri Jan 29 03:21:18 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **Invalid pull request title: No valid Jira ID provided**
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r568316262



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       > 1. Yes, because of unnecessary complexity (plus `instanceof` is more flexible than keys of `class` type).
   > 2. Can you explain why is it more flexible and extensible than just `if-else`?. And why the same level of flexibility and extensibility is needed in wrapping and wrapped backends?
   > 3. I'd prefer such a refactoring (if any) to be performed before adding a new backend
   
   **Why more flexible and extensible:**
   as you said, the mappings can be updated dynamically or programmatically, 
   like `mapper.put (new type, new factory)`; 
   and also easy for check `mapper.get(a type)` is existed or not. 
   
   That's something if-else can not simply and cleanly do
   
   **why the same level of flexibility and extensibility**
   If the underlying can support dynamically add a new type, as what is now; I do want the wrapper to support "dynamically add a new type" as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r577887482



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       But what is the reason to limit proxying to these classes in the first place? Why not proxy everything?
   Or, if we want to avoid double-proxying then maybe check that specific class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831",
       "triggerID" : "bc4cd266c548473fdd31bad5018ae210048d0472",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc4cd266c548473fdd31bad5018ae210048d0472 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12831) 
   * 0b3d7cc3dc22cc1ba08d6c37c9d50aee3bc1ee46 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633) 
   * a6e2884666b069a67c935809aff84a2fc15d20af Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdcecae34970cd83644f10cb5ba179458269854d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642) 
   * cf913a7fa26e306a78febf0ad7b5fb9e8d48288d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a6e2884666b069a67c935809aff84a2fc15d20af Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636) 
   * c88255d770aa864a14f9aa2fc52d63a10e045f23 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur closed pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur closed pull request #14799:
URL: https://github.com/apache/flink/pull/14799


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ee60d64feb3da9e9fc24e08482260d4b021c65f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652) 
   * 89066112157569ad58908e401696b45a56f48fc3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r568316262



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       > 1. Yes, because of unnecessary complexity (plus `instanceof` is more flexible than keys of `class` type).
   > 2. Can you explain why is it more flexible and extensible than just `if-else`?. And why the same level of flexibility and extensibility is needed in wrapping and wrapped backends?
   > 3. I'd prefer such a refactoring (if any) to be performed before adding a new backend
   
   **Why more flexible and extensible:**
   as you said, the mappings can be updated dynamically or programmatically, 
   like `mapper.put (new type, new factory)`; 
   and also easy for check `mapper.get(a type)` is existed or not. 
   
   That's something if-else can not simply and cleanly do
   
   **why the same level of flexibility and extensibility**
   If the underlying can support dynamically add a new type, as what is now; I do want the wrapper to support "dynamically add a new type" as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r566996438



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       I would prefer a code like this:
   ```
   InternalKvState nested =
           keyedStateBackend.createInternalState(
                   namespaceSerializer, stateDesc, snapshotTransformFactory);
   if (stateDesc instanceof ValueStateDescriptor) {
       return new ProxyValueState((InternalValueState) nested);
   } else if (stateDesc instanceof ListStateDescriptor) {
       return ...
   }
   ```
   It would be simpler and shorter.
   
   I think that `Map` can be useful when the mappings can be updated dynamically or programmatically. Neither is the case.
   And then factory interface can be dropped as it's only needed to work with map IIUC.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public ProxyKeyedStateBackend(AbstractKeyedStateBackend<K> keyedStateBackend) {
+        super(
+                keyedStateBackend.kvStateRegistry,
+                keyedStateBackend.keySerializer,
+                keyedStateBackend.userCodeClassLoader,
+                keyedStateBackend.executionConfig,
+                keyedStateBackend.ttlTimeProvider,
+                keyedStateBackend.cancelStreamRegistry,
+                keyedStateBackend.keyGroupCompressionDecorator,
+                keyedStateBackend.keyContext);
+        this.keyedStateBackend = keyedStateBackend;

Review comment:
       In runtime, we end up with 4 backend:
   ```
   Proxy extends Abstract (has it's own fields AND refers to passed fields)
       Proxied extends Abstract (has it's own fields AND refers to passed fields)
   ```
   Right?
   
   I see the following drawbacks:
   1. Double caching and lookup in `keyValueStatesByName`
   2. Error-prone: 4 backend classes refer to the same `kvStateRegistry`, `keySerializer` etc. making it easy to either call some method twice or not call it for a wrapped backend (e.g. `setCurrentKey` isn't called for the wrapped backend)
   4. To me this hierarchy is very confusing
   
   Why not refactor this in some way or another? Can `AbstractKeyedStateBackend` be also a proxy (i.e. replace inheritance with delegation)?

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -44,6 +44,15 @@
                     .noDefaultValue()
                     .withDescription("The state backend to be used to checkpoint state.");
 
+    /** Whether to enable state change log. */
+    // @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
+            ConfigOptions.key("state.backend.enable-statechangelog")

Review comment:
       I think this option should be hidden.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567551460



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       I do not have a preference in general, simpler is better. 
   
   However, I am constructing the factory map this way to follow the same style/functionality existing keyedStateBackends use, since this is a wrapper for them. 
   
   If you prefer to make them simpler, we probably should change them altogether. Having two different styles may lead to more confusion (as the TTL wrapper looks to me), but I guess we can do it in a different PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bdcecae34970cd83644f10cb5ba179458269854d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642) 
   * cf913a7fa26e306a78febf0ad7b5fb9e8d48288d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cf913a7fa26e306a78febf0ad7b5fb9e8d48288d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633) 
   * a6e2884666b069a67c935809aff84a2fc15d20af Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636) 
   * c88255d770aa864a14f9aa2fc52d63a10e045f23 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 89066112157569ad58908e401696b45a56f48fc3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-770546162


   Thank you so much for reviewing, @rkhachatryan !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r577882323



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ProxyKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>, CheckpointListener {
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    // ==============================================================
+    //  cache maintained by the proxyKeyedStateBackend itself
+    //  not the same as the underlying wrapped keyedStateBackend
+    //  InternalKvState is a ProxyXXState, XX stands for Value, List ...
+    /** So that we can give out state when the user uses the same key. */
+    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+
+    @SuppressWarnings("rawtypes")
+    protected InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    protected String lastName;
+
+    // ==============================================================
+    // ==== the same as the wrapped keyedStateBackend
+
+    public final ExecutionConfig executionConfig;
+
+    public final TtlTimeProvider ttlTimeProvider;
+
+    public ProxyKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) {
+
+            final S state = getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {

Review comment:
       Got it. I was reading "support" as "can be" modified concurrently.
   Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567978393



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       1. Yes, because of unnecessary complexity (plus `instanceof` is more flexible than keys of `class` type).
   2. Can you explain why is it more flexible and extensible than just `if-else`?
   3. And why the same level of flexibility and extensibility is needed in wrapping and wrapped backends?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567551460



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** */
+public class ProxyKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+    // wrapped keyed state backend, either HeapKeyedStateBackend or RocksDBKeyedStateBackend
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,

Review comment:
       I do not have a preference in general, simpler is better. 
   
   However, I am constructing the factory map this way to follow the same style/functionality existing keyedStateBackends use, since this is a wrapper for them. 
   
   If you prefer to make them simpler, we probably should change them altogether. Having two different styles may lead to more confusion.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r570555194



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ProxyKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>, CheckpointListener {
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    // ==============================================================
+    //  cache maintained by the proxyKeyedStateBackend itself
+    //  not the same as the underlying wrapped keyedStateBackend
+    //  InternalKvState is a ProxyXXState, XX stands for Value, List ...
+    /** So that we can give out state when the user uses the same key. */
+    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+
+    @SuppressWarnings("rawtypes")
+    protected InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    protected String lastName;
+
+    // ==============================================================
+    // ==== the same as the wrapped keyedStateBackend
+
+    public final ExecutionConfig executionConfig;
+
+    public final TtlTimeProvider ttlTimeProvider;
+
+    public ProxyKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) {
+
+            final S state = getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {

Review comment:
       Shouldn't the name be `notSupportConcurrentModification` (reversed)?
   If true then we iterate without copying to list.
   
   Just out of curiosity: did you actually get an exception here with HeapStateBackend? Which test?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -204,6 +216,11 @@ public static StateBackend fromApplicationOrConfigOrDefault(
 
         // (1) the application defined state backend has precedence
         if (fromApplication != null) {
+
+            checkArgument(
+                    !(fromApplication instanceof ProxyStateBackend),

Review comment:
       This will not detect `new StubBackend(new ProxyBackend))` :)
   I suggested above to add `unwrap` method which could also help here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -242,6 +259,34 @@ public static StateBackend fromApplicationOrConfigOrDefault(
         return backend;
     }
 
+    public static StateBackend loadStateBackend(
+            @Nullable StateBackend fromApplication,
+            Configuration config,
+            ClassLoader classLoader,
+            @Nullable Logger logger)
+            throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+        final StateBackend backend =
+                fromApplicationOrConfigOrDefault(fromApplication, config, classLoader, logger);
+
+        if (needToProxyStateBackend(backend, config)) {
+            LOG.info(
+                    "Proxy State Backend used, and the root State Backend is {}",
+                    backend.getClass().getSimpleName());
+            return new ProxyStateBackend(backend);

Review comment:
       I think this class is the right place to decide and create `ProxyStateBackend` :+1: 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
##########
@@ -99,23 +101,30 @@ private InternalTimeServiceManagerImpl(
      * <p><b>IMPORTANT:</b> Keep in sync with {@link InternalTimeServiceManager.Provider}.
      */
     public static <K> InternalTimeServiceManagerImpl<K> create(
-            CheckpointableKeyedStateBackend<K> keyedStatedBackend,
+            CheckpointableKeyedStateBackend<K> keyedStateBackend,
             ClassLoader userClassloader,
             KeyContext keyContext,
             ProcessingTimeService processingTimeService,
             Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates)
             throws Exception {
-        final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange();
+        final KeyGroupRange keyGroupRange = keyedStateBackend.getKeyGroupRange();
+
+        KeyedStateBackend<?> rootKeyedStateBackend =
+                keyedStateBackend instanceof ProxyKeyedStateBackend
+                        ? ((ProxyKeyedStateBackend<?>) keyedStateBackend)
+                                .getProxiedKeyedStateBackend()
+                        : keyedStateBackend;
+
         final boolean requiresSnapshotLegacyTimers =
-                keyedStatedBackend instanceof AbstractKeyedStateBackend
-                        && ((AbstractKeyedStateBackend<K>) keyedStatedBackend)
+                rootKeyedStateBackend instanceof AbstractKeyedStateBackend
+                        && ((AbstractKeyedStateBackend<K>) rootKeyedStateBackend)

Review comment:
       WDYT about pulling `requiresLegacySynchronousTimerSnapshots` to the interface?
   Then, with `unwrap` added we could just call
   `rootKeyedStateBackend.unwrap().requiresLegacySynchronousTimerSnapshots()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -162,14 +163,19 @@ public static CheckpointStorage load(
         Preconditions.checkNotNull(classLoader, "classLoader");
         Preconditions.checkNotNull(configuredStateBackend, "statebackend");
 
-        if (configuredStateBackend instanceof CheckpointStorage) {
+        StateBackend rootStateBackend =
+                (configuredStateBackend instanceof ProxyStateBackend)
+                        ? ((ProxyStateBackend) configuredStateBackend).getProxiedStateBackend()
+                        : configuredStateBackend;

Review comment:
       I see this fragment several times in your PR. If later we add one more layer (say logging or new TTL implementation) this can break (we already have `StubStateBackend`).
   
   WDYT about adding `default StateBackend unwrap()` method to the interface?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       Why do we want to limit proxying to these classes?
   Besides that, hardcoding class names like this is a bit fragile (`RocksDBStateBackend.class`?)
   
   At least one upcoming PR already adds new classes that should be proxied.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
##########
@@ -31,7 +31,7 @@
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
-class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>
+public class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>

Review comment:
       Why is this change needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575781718



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ProxyKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>, CheckpointListener {
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    // ==============================================================
+    //  cache maintained by the proxyKeyedStateBackend itself
+    //  not the same as the underlying wrapped keyedStateBackend
+    //  InternalKvState is a ProxyXXState, XX stands for Value, List ...
+    /** So that we can give out state when the user uses the same key. */
+    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+
+    @SuppressWarnings("rawtypes")
+    protected InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    protected String lastName;
+
+    // ==============================================================
+    // ==== the same as the wrapped keyedStateBackend
+
+    public final ExecutionConfig executionConfig;
+
+    public final TtlTimeProvider ttlTimeProvider;
+
+    public ProxyKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) {
+
+            final S state = getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {

Review comment:
       If it supports concurrent modification, you do not need to get the full list of keys in front (iterator can not be modified).
   
   Yes, there is test-case for this: `StateBackendTestBase#testConcurrentModificationWithApplyToAllKeys`
   And in the Jira ticket why this test is added there is more details on what exactly "ConcurrentModification" refers to. It refers to concurrent modification on key-value entries of the state; StateTable is a HashTable (something similar, do not remember details), which does not support concurrent modification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ee60d64feb3da9e9fc24e08482260d4b021c65f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12647",
       "triggerID" : "cf913a7fa26e306a78febf0ad7b5fb9e8d48288d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652",
       "triggerID" : "7ee60d64feb3da9e9fc24e08482260d4b021c65f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "89066112157569ad58908e401696b45a56f48fc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693",
       "triggerID" : "89066112157569ad58908e401696b45a56f48fc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ee60d64feb3da9e9fc24e08482260d4b021c65f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12652) 
   * 89066112157569ad58908e401696b45a56f48fc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12693) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88255d770aa864a14f9aa2fc52d63a10e045f23 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638) 
   * bdcecae34970cd83644f10cb5ba179458269854d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567641769



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -44,6 +44,15 @@
                     .noDefaultValue()
                     .withDescription("The state backend to be used to checkpoint state.");
 
+    /** Whether to enable state change log. */
+    // @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
+            ConfigOptions.key("state.backend.enable-statechangelog")

Review comment:
       I mean to hide it from users (with `Documentation.ExcludeFromDocumentation`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575780979



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -204,6 +216,11 @@ public static StateBackend fromApplicationOrConfigOrDefault(
 
         // (1) the application defined state backend has precedence
         if (fromApplication != null) {
+
+            checkArgument(
+                    !(fromApplication instanceof ProxyStateBackend),

Review comment:
       I've added an abstract class to avoid this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r575781718



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/proxy/ProxyKeyedStateBackend.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.proxy;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ProxyKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>, CheckpointListener {
+    AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) ProxyValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ProxyListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) ProxyReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) ProxyAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class, (StateFactory) ProxyMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    // ==============================================================
+    //  cache maintained by the proxyKeyedStateBackend itself
+    //  not the same as the underlying wrapped keyedStateBackend
+    //  InternalKvState is a ProxyXXState, XX stands for Value, List ...
+    /** So that we can give out state when the user uses the same key. */
+    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
+
+    @SuppressWarnings("rawtypes")
+    protected InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    protected String lastName;
+
+    // ==============================================================
+    // ==== the same as the wrapped keyedStateBackend
+
+    public final ExecutionConfig executionConfig;
+
+    public final TtlTimeProvider ttlTimeProvider;
+
+    public ProxyKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), namespace)) {
+
+            final S state = getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {

Review comment:
       If it supports concurrent modification, you do not need to get the full list of keys in front (iterator can not be modified); that's what the logic means here.
   
   test-case for this: Yes, 
   `StateBackendTestBase#testConcurrentModificationWithApplyToAllKeys`
   
   In the Jira ticket why this test is added, there are more details on what exactly "ConcurrentModification" refers to. It refers to concurrent modification on key-value entries of the state; StateTable is a HashTable (something similar, do not remember details), which does not support concurrent modification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r567563070



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -44,6 +44,15 @@
                     .noDefaultValue()
                     .withDescription("The state backend to be used to checkpoint state.");
 
+    /** Whether to enable state change log. */
+    // @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
+    public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
+            ConfigOptions.key("state.backend.enable-statechangelog")

Review comment:
       do you mean we should hide it from users for now or we should not use this option?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14799: [WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14799:
URL: https://github.com/apache/flink/pull/14799#issuecomment-769554549


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12633",
       "triggerID" : "fedc93c6d1bccd231d8cf5742c1ffbedd41b78f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12636",
       "triggerID" : "a6e2884666b069a67c935809aff84a2fc15d20af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638",
       "triggerID" : "c88255d770aa864a14f9aa2fc52d63a10e045f23",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642",
       "triggerID" : "bdcecae34970cd83644f10cb5ba179458269854d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88255d770aa864a14f9aa2fc52d63a10e045f23 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12638) 
   * bdcecae34970cd83644f10cb5ba179458269854d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12642) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] curcur commented on a change in pull request #14799: [FLINK-21354][WIP] Wrapped StateBackend to forward state changes to StateChangleLog

Posted by GitBox <gi...@apache.org>.
curcur commented on a change in pull request #14799:
URL: https://github.com/apache/flink/pull/14799#discussion_r577316437



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -47,6 +52,13 @@
     //  Configuration shortcut names
     // ------------------------------------------------------------------------
 
+    private static final Set<String> STATE_BACKEND_CAN_BE_PROXIED =
+            new HashSet<>(
+                    Arrays.asList(
+                            "org.apache.flink.contrib.streaming.state.RocksDBStateBackend",

Review comment:
       But I think you make a good point here, we should not hardcoded the can-be-proxied state backends.
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org