You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/10 06:05:47 UTC

[GitHub] [flink] fredia opened a new pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

fredia opened a new pull request #19033:
URL: https://github.com/apache/flink/pull/19033


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   ## What is the purpose of the change
   
   Since the FRocksDB version was upgraded to 6.20.3 in 1.14,  `deleteRange()` is now usable in production.
   
   This change was previously worked by @[lgo](https://github.com/lgo) in https://github.com/apache/flink/pull/14893 and highlighted by [@sihuazhou](https://github.com/sihuazhou) in [FLINK-8790](https://issues.apache.org/jira/browse/FLINK-8790). Since `deleteRange()` was marked as an experimental feature, all these previous jobs are suspended.
   
   This PR wants to continue the above work to improve the performance of RocksDB state backend incremental rescaling operations, by replacing **delete-key-by-key** with `deleteRange()` when clipping base rocksdb.
   
   ## Brief change log
   
     - Replace **delete-key-by-key** with `deleteRange()` when clipping base rocksdb during incremental rescaling scenario.
     -  Add ITCases for rescaling from checkpoints.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     -  Add ITCases for scaling in/out from checkpoints.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / **don't know**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 697f9aad6e5d3a630ccfb16dd813cbb36666da52 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097) 
   * 28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1073415574


   CI passed at https://dev.azure.com/fredia/flink/_build/results?buildId=68&view=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1073462371


   @Aitozi Thank you for your interest. As you mentioned, 
   > we could always choose one as the base db, since the DeleteRange can finish in no time. 
   
   In theory, setting the default value to 0 is most suitable, since `deleteRange()` takes less time than creating a new RocksDB instance and then scan-and-put the data. We did some experiments with different thresholds, and create a ticket https://issues.apache.org/jira/browse/FLINK-26757 about this, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d1dbec66de54a97fb4a1a9a9304806bbe4da9af Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290",
       "triggerID" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208) 
   * f6c2933a9edaeefd01729d7325298141894b0f35 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r829757672



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.checkpointing.utils.RescalingTestUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Test checkpoint rescaling for incremental rocksdb. */
+public class RescaleCheckpointManuallyITCase {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+    private static MiniClusterWithClientResource cluster;
+    private File checkpointDir;
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void setup() throws Exception {
+        Configuration config = new Configuration();
+
+        checkpointDir = temporaryFolder.newFolder();
+
+        config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.setString(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+        config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(config)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+                                .build());
+        cluster.before();
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }

Review comment:
       I think tests of rescale in and out is enough here.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.checkpointing.utils.RescalingTestUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Test checkpoint rescaling for incremental rocksdb. */
+public class RescaleCheckpointManuallyITCase {

Review comment:
       This test should extend `TestLogger`.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.checkpointing.utils.RescalingTestUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Test checkpoint rescaling for incremental rocksdb. */
+public class RescaleCheckpointManuallyITCase {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+    private static MiniClusterWithClientResource cluster;
+    private File checkpointDir;
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void setup() throws Exception {
+        Configuration config = new Configuration();
+
+        checkpointDir = temporaryFolder.newFolder();
+
+        config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.setString(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+        config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(config)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+                                .build());
+        cluster.before();
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
+            throws Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int numberElements2 = 500;
+        final int parallelism = scaleOut ? 3 : 4;
+        final int parallelism2 = scaleOut ? 4 : 3;
+        final int maxParallelism = 13;
+
+        cluster.before();
+
+        ClusterClient<?> client = cluster.getClusterClient();
+        String checkpointPath =
+                runJobAndGetCheckpoint(
+                        numberKeys,
+                        numberElements,
+                        parallelism,
+                        maxParallelism,
+                        client,
+                        checkpointDir);
+
+        assertNotNull(checkpointPath);
+
+        int restoreMaxParallelism =
+                deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism;
+
+        restoreAndAssert(
+                parallelism2,
+                restoreMaxParallelism,
+                maxParallelism,
+                numberKeys,
+                numberElements2,
+                numberElements + numberElements2,
+                client,
+                checkpointPath);
+    }
+
+    private static String runJobAndGetCheckpoint(
+            int numberKeys,
+            int numberElements,
+            int parallelism,
+            int maxParallelism,
+            ClusterClient<?> client,
+            File checkpointDir)
+            throws Exception {
+        try {
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism, maxParallelism, numberKeys, numberElements, false, 100);
+            NotifyingDefiniteKeySource.countDownLatch = new CountDownLatch(parallelism);
+            client.submitJob(jobGraph).get();
+            NotifyingDefiniteKeySource.countDownLatch.await();
+
+            RescalingTestUtils.SubtaskIndexFlatMapper.workCompletedLatch.await();
+
+            // verify the current state
+            Set<Tuple2<Integer, Integer>> actualResult =
+                    RescalingTestUtils.CollectionSink.getElementsSet();
+
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, keyGroupIndex),
+                                numberElements * key));
+            }
+
+            assertEquals(expectedResult, actualResult);
+            NotifyingDefiniteKeySource.countDownLatch.await();
+
+            waitUntilExternalizedCheckpointCreated(checkpointDir);
+            client.cancel(jobGraph.getJobID()).get();
+            TestUtils.waitUntilCanceled(jobGraph.getJobID(), client);
+            return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir).getAbsolutePath();
+        } finally {
+            RescalingTestUtils.CollectionSink.clearElementsSet();
+        }
+    }
+
+    private void restoreAndAssert(
+            int restoreParallelism,
+            int restoreMaxParallelism,
+            int maxParallelismBefore,
+            int numberKeys,
+            int numberElements,
+            int numberElementsExpect,
+            ClusterClient<?> client,
+            String restorePath)
+            throws Exception {
+        try {
+
+            JobGraph scaledJobGraph =
+                    createJobGraphWithKeyedState(
+                            restoreParallelism,
+                            restoreMaxParallelism,
+                            numberKeys,
+                            numberElements,
+                            true,
+                            100);
+
+            scaledJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(restorePath));
+
+            submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
+
+            Set<Tuple2<Integer, Integer>> actualResult2 =
+                    RescalingTestUtils.CollectionSink.getElementsSet();
+
+            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex =
+                        KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelismBefore);
+                expectedResult2.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelismBefore, restoreParallelism, keyGroupIndex),
+                                key * numberElementsExpect));
+            }
+            assertEquals(expectedResult2, actualResult2);
+        } finally {
+            RescalingTestUtils.CollectionSink.clearElementsSet();
+        }
+    }
+
+    private static void waitUntilExternalizedCheckpointCreated(File checkpointDir)
+            throws InterruptedException, IOException {
+        while (true) {
+            Thread.sleep(50);
+            Optional<File> externalizedCheckpoint =
+                    TestUtils.getMostRecentCompletedCheckpointMaybe(checkpointDir);
+            if (externalizedCheckpoint.isPresent()) {
+                break;
+            }
+        }
+    }

Review comment:
       We can place this helper in `TestUtils` so that both `ResumeCheckpointManuallyITCase` and `RescaleCheckpointManuallyITCase` could use.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/RescalingTestUtils.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test utilities for rescaling. */
+public class RescalingTestUtils {
+
+    /** A parallel source with definite keys. */
+    public static class DefiniteKeySource extends RichParallelSourceFunction<Integer> {
+
+        private static final long serialVersionUID = -400066323594122516L;
+
+        private final int numberKeys;
+        private final int numberElements;
+        private final boolean terminateAfterEmission;
+
+        protected int counter = 0;
+
+        private boolean running = true;
+
+        public DefiniteKeySource(
+                int numberKeys, int numberElements, boolean terminateAfterEmission) {
+            this.numberKeys = numberKeys;
+            this.numberElements = numberElements;
+            this.terminateAfterEmission = terminateAfterEmission;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            final Object lock = ctx.getCheckpointLock();
+            final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            while (running) {
+
+                if (counter < numberElements) {
+                    synchronized (lock) {
+                        for (int value = subtaskIndex;
+                                value < numberKeys;
+                                value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+                            ctx.collect(value);
+                        }
+                        counter++;
+                    }
+                } else {
+                    if (terminateAfterEmission) {
+                        running = false;
+                    } else {
+                        Thread.sleep(100);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /** A flatMapper with the index of subtask. */
+    public static class SubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 5273172591283191348L;
+
+        public static CountDownLatch workCompletedLatch = new CountDownLatch(1);
+        private static AtomicInteger completedCheckpointNum;

Review comment:
       There is no need to make `completedCheckpointNum` as static.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/RescalingTestUtils.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test utilities for rescaling. */
+public class RescalingTestUtils {
+
+    /** A parallel source with definite keys. */
+    public static class DefiniteKeySource extends RichParallelSourceFunction<Integer> {
+
+        private static final long serialVersionUID = -400066323594122516L;
+
+        private final int numberKeys;
+        private final int numberElements;
+        private final boolean terminateAfterEmission;
+
+        protected int counter = 0;
+
+        private boolean running = true;
+
+        public DefiniteKeySource(
+                int numberKeys, int numberElements, boolean terminateAfterEmission) {
+            this.numberKeys = numberKeys;
+            this.numberElements = numberElements;
+            this.terminateAfterEmission = terminateAfterEmission;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            final Object lock = ctx.getCheckpointLock();
+            final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            while (running) {
+
+                if (counter < numberElements) {
+                    synchronized (lock) {
+                        for (int value = subtaskIndex;
+                                value < numberKeys;
+                                value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+                            ctx.collect(value);
+                        }
+                        counter++;
+                    }
+                } else {
+                    if (terminateAfterEmission) {
+                        running = false;
+                    } else {
+                        Thread.sleep(100);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /** A flatMapper with the index of subtask. */
+    public static class SubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 5273172591283191348L;
+
+        public static CountDownLatch workCompletedLatch = new CountDownLatch(1);
+        private static AtomicInteger completedCheckpointNum;
+
+        private transient ValueState<Integer> counter;
+        private transient ValueState<Integer> sum;
+
+        private final int numberElements;
+
+        public SubtaskIndexFlatMapper(int numberElements) {
+            this.numberElements = numberElements;
+            this.completedCheckpointNum = new AtomicInteger();
+        }
+
+        @Override
+        public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out)
+                throws Exception {
+
+            int count = counter.value() + 1;
+            counter.update(count);
+
+            int s = sum.value() + value;
+            sum.update(s);
+
+            if (count % numberElements == 0) {
+                out.collect(Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), s));
+                workCompletedLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            // all managed, nothing to do.
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            counter =
+                    context.getKeyedStateStore()
+                            .getState(new ValueStateDescriptor<>("counter", Integer.class, 0));
+            sum =
+                    context.getKeyedStateStore()
+                            .getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            completedCheckpointNum.getAndIncrement();
+        }
+    }
+
+    /** Sink for collecting results into a collection. */
+    public static class CollectionSink<IN> implements SinkFunction<IN> {
+
+        private static Set<Object> elements =
+                Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>());

Review comment:
       ```suggestion
           private static final Set<Object> elements =
                   Collections.newSetFromMap(new ConcurrentHashMap<>());
   ```

##########
File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
##########
@@ -139,4 +142,11 @@ private static boolean hasMetadata(Path file) {
             return false; // should never happen
         }
     }
+
+    public static void waitUntilCanceled(JobID jobId, ClusterClient<?> client)

Review comment:
       I think `waitUntilJobCanceled` looks better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Myasuka commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1073440855


   Megred in master with b6822f293e24263752419e49f8b7910f4e0464a8 and 1bf45b25791cc3fad8b7d0d863caa9b0eef9a87b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d1dbec66de54a97fb4a1a9a9304806bbe4da9af Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d1dbec66de54a97fb4a1a9a9304806bbe4da9af Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807) 
   * 768a998a8989828f816cbdfa009737529ec9dbd0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r829831794



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/RescalingTestUtils.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test utilities for rescaling. */
+public class RescalingTestUtils {
+
+    /** A parallel source with definite keys. */
+    public static class DefiniteKeySource extends RichParallelSourceFunction<Integer> {
+
+        private static final long serialVersionUID = -400066323594122516L;
+
+        private final int numberKeys;
+        private final int numberElements;
+        private final boolean terminateAfterEmission;
+
+        protected int counter = 0;
+
+        private boolean running = true;
+
+        public DefiniteKeySource(
+                int numberKeys, int numberElements, boolean terminateAfterEmission) {
+            this.numberKeys = numberKeys;
+            this.numberElements = numberElements;
+            this.terminateAfterEmission = terminateAfterEmission;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            final Object lock = ctx.getCheckpointLock();
+            final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            while (running) {
+
+                if (counter < numberElements) {
+                    synchronized (lock) {
+                        for (int value = subtaskIndex;
+                                value < numberKeys;
+                                value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+                            ctx.collect(value);
+                        }
+                        counter++;
+                    }
+                } else {
+                    if (terminateAfterEmission) {
+                        running = false;
+                    } else {
+                        Thread.sleep(100);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /** A flatMapper with the index of subtask. */
+    public static class SubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 5273172591283191348L;
+
+        public static CountDownLatch workCompletedLatch = new CountDownLatch(1);
+        private static AtomicInteger completedCheckpointNum;

Review comment:
       `completedCheckpointNum ` is redundant after adding `cutdownLatch` to source, delete it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6035c75b7f09958a7b995635e34923aac40a743f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6035c75b7f09958a7b995635e34923aac40a743f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103) 
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * 6035c75b7f09958a7b995635e34923aac40a743f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka closed pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Myasuka closed pull request #19033:
URL: https://github.com/apache/flink/pull/19033


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r824522047



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -861,6 +1010,16 @@ public void initializeState(FunctionInitializationContext context) throws Except
                     context.getKeyedStateStore()
                             .getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
         }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            completedCheckpointNum.getAndIncrement();
+        }
+
+        @Override
+        public void notifyCheckpointAborted(long checkpointId) throws Exception {
+            CheckpointListener.super.notifyCheckpointAborted(checkpointId);
+        }

Review comment:
       We don't need to implement this method.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -290,6 +301,139 @@ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMax
         }
     }
 
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
+            throws Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int numberElements2 = 500;
+        final int parallelism = scaleOut ? 3 : 4;
+        final int parallelism2 = scaleOut ? 4 : 3;
+        final int maxParallelism = 13;
+        Duration timeout = Duration.ofMinutes(3);
+        Deadline deadline = Deadline.now().plus(timeout);
+        ClusterClient<?> client = cluster.getClusterClient();
+        try {
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism, maxParallelism, numberKeys, numberElements, false, 100);
+            final JobID jobID = jobGraph.getJobID();
+            // make sure the job does not finish before we cancel it
+            StateSourceBase.canFinishLatch = new CountDownLatch(1);
+            client.submitJob(jobGraph).get();
+            // wait till the sources have emitted numberElements for each key
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            // verify the current state
+            Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, keyGroupIndex),
+                                numberElements * key));
+            }
+            assertEquals(expectedResult, actualResult);
+            // don't cancel source before we get a completed checkpoint
+            while (SubtaskIndexFlatMapper.completedCheckpointNum.get() / parallelism < 1) {
+                Thread.sleep(10);
+            }
+            String checkpointPath = getLastedCheckpointPath();
+            while (checkpointPath == null) {
+                Thread.sleep(10);
+                checkpointPath = getLastedCheckpointPath();
+            }
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+            StateSourceBase.canFinishLatch.countDown();
+            client.cancel(jobID).get();
+            while (!getRunningJobs(client).isEmpty()) {
+                Thread.sleep(50);
+            }
+            int restoreMaxParallelism =
+                    deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism;
+            JobGraph scaledJobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism2,
+                            restoreMaxParallelism,
+                            numberKeys,
+                            numberElements + numberElements2,
+                            true,
+                            100);
+            scaledJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(checkpointPath));
+            submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult2.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism2, keyGroupIndex),
+                                key * (numberElements + numberElements2)));
+            }
+            assertEquals(expectedResult2, actualResult2);
+        } finally {
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+        }
+    }
+
+    private String getLastedCheckpointPath() {
+        String checkpointPath = null;
+        int ckpId = 0;
+        for (File instanceFile : checkpointDir.listFiles()) {
+            for (File file : instanceFile.listFiles()) {
+                if (!file.getName().startsWith("chk-")) {
+                    continue;
+                }
+                boolean flag = false;

Review comment:
       I think a variable named `foundCheckpoint` looks better.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -95,23 +98,28 @@
     private static final int slotsPerTaskManager = 2;
     private static final int numSlots = numTaskManagers * slotsPerTaskManager;
 
-    @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}")
+    @Parameterized.Parameters(
+            name = "backend = {0}, buffersPerChannel = {1}, incrementalFlag = {2}")

Review comment:
       ```suggestion
               name = "backend = {0}, buffersPerChannel = {1}, incrementalCheckpoint = {2}")
   ```

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -290,6 +301,139 @@ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMax
         }
     }
 
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)

Review comment:
       I believe most of code could be shared with `testSavepointRescalingKeyedState`, let's simplify the logic.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -290,6 +301,139 @@ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMax
         }
     }
 
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
+            throws Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int numberElements2 = 500;
+        final int parallelism = scaleOut ? 3 : 4;
+        final int parallelism2 = scaleOut ? 4 : 3;
+        final int maxParallelism = 13;
+        Duration timeout = Duration.ofMinutes(3);
+        Deadline deadline = Deadline.now().plus(timeout);
+        ClusterClient<?> client = cluster.getClusterClient();
+        try {
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism, maxParallelism, numberKeys, numberElements, false, 100);
+            final JobID jobID = jobGraph.getJobID();
+            // make sure the job does not finish before we cancel it
+            StateSourceBase.canFinishLatch = new CountDownLatch(1);
+            client.submitJob(jobGraph).get();
+            // wait till the sources have emitted numberElements for each key
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            // verify the current state
+            Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, keyGroupIndex),
+                                numberElements * key));
+            }
+            assertEquals(expectedResult, actualResult);
+            // don't cancel source before we get a completed checkpoint
+            while (SubtaskIndexFlatMapper.completedCheckpointNum.get() / parallelism < 1) {
+                Thread.sleep(10);
+            }
+            String checkpointPath = getLastedCheckpointPath();
+            while (checkpointPath == null) {
+                Thread.sleep(10);
+                checkpointPath = getLastedCheckpointPath();
+            }
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+            StateSourceBase.canFinishLatch.countDown();
+            client.cancel(jobID).get();
+            while (!getRunningJobs(client).isEmpty()) {
+                Thread.sleep(50);
+            }
+            int restoreMaxParallelism =
+                    deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism;
+            JobGraph scaledJobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism2,
+                            restoreMaxParallelism,
+                            numberKeys,
+                            numberElements + numberElements2,
+                            true,
+                            100);
+            scaledJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(checkpointPath));
+            submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult2.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism2, keyGroupIndex),
+                                key * (numberElements + numberElements2)));
+            }
+            assertEquals(expectedResult2, actualResult2);
+        } finally {
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+        }
+    }
+
+    private String getLastedCheckpointPath() {
+        String checkpointPath = null;
+        int ckpId = 0;
+        for (File instanceFile : checkpointDir.listFiles()) {
+            for (File file : instanceFile.listFiles()) {
+                if (!file.getName().startsWith("chk-")) {
+                    continue;
+                }
+                boolean flag = false;
+                for (File child : file.listFiles()) {
+                    if (child.getName().endsWith("_metadata")) {

Review comment:
       We can change `_metadata` to `AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
##########
@@ -153,30 +149,15 @@ private static void deleteRange(
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
             byte[] beginKeyBytes,
-            byte[] endKeyBytes,
-            @Nonnegative long writeBatchSize)
+            byte[] endKeyBytes)
             throws RocksDBException {
 
         for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-            try (ReadOptions readOptions = new ReadOptions();
-                    RocksIteratorWrapper iteratorWrapper =
-                            RocksDBOperationUtils.getRocksIterator(
-                                    db, columnFamilyHandle, readOptions);
-                    RocksDBWriteBatchWrapper writeBatchWrapper =
-                            new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
-
-                iteratorWrapper.seek(beginKeyBytes);
-
-                while (iteratorWrapper.isValid()) {
-                    final byte[] currentKey = iteratorWrapper.key();
-                    if (beforeThePrefixBytes(currentKey, endKeyBytes)) {
-                        writeBatchWrapper.remove(columnFamilyHandle, currentKey);
-                    } else {
-                        break;
-                    }
-                    iteratorWrapper.next();
-                }
-            }
+            // Using RocksDB's deleteRange will take advantage of delete
+            // tombstones, which mark the range as deleted.
+            //
+            // https://github.com/facebook/rocksdb/blob/bcd32560dd5898956b9d24553c2bb3c1b1d2319f/include/rocksdb/db.h#L357-L371

Review comment:
       Maybe we could point to the FRocksDB link.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r827588561



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -135,14 +146,16 @@ public void setup() throws Exception {
 
             Configuration config = new Configuration();
 
-            final File checkpointDir = temporaryFolder.newFolder();
+            checkpointDir = temporaryFolder.newFolder();
             final File savepointDir = temporaryFolder.newFolder();
 
             config.setString(StateBackendOptions.STATE_BACKEND, currentBackend);
             config.setString(
                     CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
             config.setString(
                     CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+            config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incrementalCheckpoint);
+            config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 20);

Review comment:
       I think the large retained checkpoints here is to avoid the detected checkpoint not be deleted on next successful checkpoint for next rescale. However, this is not totally safe unless you configure this value to a extremly large number.
   
   I think we can refer to the logic of `ResumeCheckpointManuallyITCase` to add another `RescaleCheckpointManuallyITCase`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1073462371


   @Aitozi Thank you for your interest. As you mentioned, 
   > we could always choose one as the base db, since the DeleteRange can finish in no time. 
   
   In theory, setting the default value to 0 is most suitable, since `deleteRange()` takes less time than creating a new RocksDB instance and then scan-and-put the data. We did some experiments with different thresholds, and I create a ticket https://issues.apache.org/jira/browse/FLINK-26757 about this, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103) 
   * d4fc20e109703ae85eb8a7786973824a8fb081ac UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290",
       "triggerID" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2bc97da2f6306049c4c3a891866977132581ebbb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33328",
       "triggerID" : "2bc97da2f6306049c4c3a891866977132581ebbb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2bc97da2f6306049c4c3a891866977132581ebbb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33328) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 768a998a8989828f816cbdfa009737529ec9dbd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984) 
   * 697f9aad6e5d3a630ccfb16dd813cbb36666da52 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r829831794



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/RescalingTestUtils.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test utilities for rescaling. */
+public class RescalingTestUtils {
+
+    /** A parallel source with definite keys. */
+    public static class DefiniteKeySource extends RichParallelSourceFunction<Integer> {
+
+        private static final long serialVersionUID = -400066323594122516L;
+
+        private final int numberKeys;
+        private final int numberElements;
+        private final boolean terminateAfterEmission;
+
+        protected int counter = 0;
+
+        private boolean running = true;
+
+        public DefiniteKeySource(
+                int numberKeys, int numberElements, boolean terminateAfterEmission) {
+            this.numberKeys = numberKeys;
+            this.numberElements = numberElements;
+            this.terminateAfterEmission = terminateAfterEmission;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            final Object lock = ctx.getCheckpointLock();
+            final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            while (running) {
+
+                if (counter < numberElements) {
+                    synchronized (lock) {
+                        for (int value = subtaskIndex;
+                                value < numberKeys;
+                                value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+                            ctx.collect(value);
+                        }
+                        counter++;
+                    }
+                } else {
+                    if (terminateAfterEmission) {
+                        running = false;
+                    } else {
+                        Thread.sleep(100);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /** A flatMapper with the index of subtask. */
+    public static class SubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 5273172591283191348L;
+
+        public static CountDownLatch workCompletedLatch = new CountDownLatch(1);
+        private static AtomicInteger completedCheckpointNum;

Review comment:
       `completedCheckpointNum ` is redundant after adding `cutdownLatch` to source, delete it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1072267658


   We tested one scale-out(3->4) case,  `deleteRange` can reduce the  initialization time by 90% at least.
   
   <div class="lake-content" typography="classic">
   
     | parallelism before | parallelism after | checkpoint size | initialization time(ms)
   -- | -- | -- | -- | --
   delete | 3 | 4 | 282MB | 6935±995
   deleteRange | 3 | 4 | 282MB | 1130±523
   delete | 3 | 4 | 3.75GB | 32815±8992
   deleteRange | 3 | 4 | 3.75GB | 4019±618
   delete | 3 | 4 | 122.3GB | 1094358±316691
   deleteRange | 3 | 4 | 122.3GB | 123866±16329
   
   </div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103) 
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r829757672



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.checkpointing.utils.RescalingTestUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Test checkpoint rescaling for incremental rocksdb. */
+public class RescaleCheckpointManuallyITCase {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+    private static MiniClusterWithClientResource cluster;
+    private File checkpointDir;
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void setup() throws Exception {
+        Configuration config = new Configuration();
+
+        checkpointDir = temporaryFolder.newFolder();
+
+        config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.setString(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+        config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(config)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+                                .build());
+        cluster.before();
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }

Review comment:
       I think tests of rescale in and out is enough here.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.checkpointing.utils.RescalingTestUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Test checkpoint rescaling for incremental rocksdb. */
+public class RescaleCheckpointManuallyITCase {

Review comment:
       This test should extend `TestLogger`.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.checkpointing.utils.RescalingTestUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Test checkpoint rescaling for incremental rocksdb. */
+public class RescaleCheckpointManuallyITCase {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+    private static MiniClusterWithClientResource cluster;
+    private File checkpointDir;
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void setup() throws Exception {
+        Configuration config = new Configuration();
+
+        checkpointDir = temporaryFolder.newFolder();
+
+        config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.setString(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+        config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(config)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+                                .build());
+        cluster.before();
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
+            throws Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int numberElements2 = 500;
+        final int parallelism = scaleOut ? 3 : 4;
+        final int parallelism2 = scaleOut ? 4 : 3;
+        final int maxParallelism = 13;
+
+        cluster.before();
+
+        ClusterClient<?> client = cluster.getClusterClient();
+        String checkpointPath =
+                runJobAndGetCheckpoint(
+                        numberKeys,
+                        numberElements,
+                        parallelism,
+                        maxParallelism,
+                        client,
+                        checkpointDir);
+
+        assertNotNull(checkpointPath);
+
+        int restoreMaxParallelism =
+                deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism;
+
+        restoreAndAssert(
+                parallelism2,
+                restoreMaxParallelism,
+                maxParallelism,
+                numberKeys,
+                numberElements2,
+                numberElements + numberElements2,
+                client,
+                checkpointPath);
+    }
+
+    private static String runJobAndGetCheckpoint(
+            int numberKeys,
+            int numberElements,
+            int parallelism,
+            int maxParallelism,
+            ClusterClient<?> client,
+            File checkpointDir)
+            throws Exception {
+        try {
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism, maxParallelism, numberKeys, numberElements, false, 100);
+            NotifyingDefiniteKeySource.countDownLatch = new CountDownLatch(parallelism);
+            client.submitJob(jobGraph).get();
+            NotifyingDefiniteKeySource.countDownLatch.await();
+
+            RescalingTestUtils.SubtaskIndexFlatMapper.workCompletedLatch.await();
+
+            // verify the current state
+            Set<Tuple2<Integer, Integer>> actualResult =
+                    RescalingTestUtils.CollectionSink.getElementsSet();
+
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, keyGroupIndex),
+                                numberElements * key));
+            }
+
+            assertEquals(expectedResult, actualResult);
+            NotifyingDefiniteKeySource.countDownLatch.await();
+
+            waitUntilExternalizedCheckpointCreated(checkpointDir);
+            client.cancel(jobGraph.getJobID()).get();
+            TestUtils.waitUntilCanceled(jobGraph.getJobID(), client);
+            return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir).getAbsolutePath();
+        } finally {
+            RescalingTestUtils.CollectionSink.clearElementsSet();
+        }
+    }
+
+    private void restoreAndAssert(
+            int restoreParallelism,
+            int restoreMaxParallelism,
+            int maxParallelismBefore,
+            int numberKeys,
+            int numberElements,
+            int numberElementsExpect,
+            ClusterClient<?> client,
+            String restorePath)
+            throws Exception {
+        try {
+
+            JobGraph scaledJobGraph =
+                    createJobGraphWithKeyedState(
+                            restoreParallelism,
+                            restoreMaxParallelism,
+                            numberKeys,
+                            numberElements,
+                            true,
+                            100);
+
+            scaledJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(restorePath));
+
+            submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
+
+            Set<Tuple2<Integer, Integer>> actualResult2 =
+                    RescalingTestUtils.CollectionSink.getElementsSet();
+
+            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex =
+                        KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelismBefore);
+                expectedResult2.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelismBefore, restoreParallelism, keyGroupIndex),
+                                key * numberElementsExpect));
+            }
+            assertEquals(expectedResult2, actualResult2);
+        } finally {
+            RescalingTestUtils.CollectionSink.clearElementsSet();
+        }
+    }
+
+    private static void waitUntilExternalizedCheckpointCreated(File checkpointDir)
+            throws InterruptedException, IOException {
+        while (true) {
+            Thread.sleep(50);
+            Optional<File> externalizedCheckpoint =
+                    TestUtils.getMostRecentCompletedCheckpointMaybe(checkpointDir);
+            if (externalizedCheckpoint.isPresent()) {
+                break;
+            }
+        }
+    }

Review comment:
       We can place this helper in `TestUtils` so that both `ResumeCheckpointManuallyITCase` and `RescaleCheckpointManuallyITCase` could use.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/RescalingTestUtils.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test utilities for rescaling. */
+public class RescalingTestUtils {
+
+    /** A parallel source with definite keys. */
+    public static class DefiniteKeySource extends RichParallelSourceFunction<Integer> {
+
+        private static final long serialVersionUID = -400066323594122516L;
+
+        private final int numberKeys;
+        private final int numberElements;
+        private final boolean terminateAfterEmission;
+
+        protected int counter = 0;
+
+        private boolean running = true;
+
+        public DefiniteKeySource(
+                int numberKeys, int numberElements, boolean terminateAfterEmission) {
+            this.numberKeys = numberKeys;
+            this.numberElements = numberElements;
+            this.terminateAfterEmission = terminateAfterEmission;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            final Object lock = ctx.getCheckpointLock();
+            final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            while (running) {
+
+                if (counter < numberElements) {
+                    synchronized (lock) {
+                        for (int value = subtaskIndex;
+                                value < numberKeys;
+                                value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+                            ctx.collect(value);
+                        }
+                        counter++;
+                    }
+                } else {
+                    if (terminateAfterEmission) {
+                        running = false;
+                    } else {
+                        Thread.sleep(100);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /** A flatMapper with the index of subtask. */
+    public static class SubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 5273172591283191348L;
+
+        public static CountDownLatch workCompletedLatch = new CountDownLatch(1);
+        private static AtomicInteger completedCheckpointNum;

Review comment:
       There is no need to make `completedCheckpointNum` as static.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/RescalingTestUtils.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test utilities for rescaling. */
+public class RescalingTestUtils {
+
+    /** A parallel source with definite keys. */
+    public static class DefiniteKeySource extends RichParallelSourceFunction<Integer> {
+
+        private static final long serialVersionUID = -400066323594122516L;
+
+        private final int numberKeys;
+        private final int numberElements;
+        private final boolean terminateAfterEmission;
+
+        protected int counter = 0;
+
+        private boolean running = true;
+
+        public DefiniteKeySource(
+                int numberKeys, int numberElements, boolean terminateAfterEmission) {
+            this.numberKeys = numberKeys;
+            this.numberElements = numberElements;
+            this.terminateAfterEmission = terminateAfterEmission;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            final Object lock = ctx.getCheckpointLock();
+            final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+            while (running) {
+
+                if (counter < numberElements) {
+                    synchronized (lock) {
+                        for (int value = subtaskIndex;
+                                value < numberKeys;
+                                value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+                            ctx.collect(value);
+                        }
+                        counter++;
+                    }
+                } else {
+                    if (terminateAfterEmission) {
+                        running = false;
+                    } else {
+                        Thread.sleep(100);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    /** A flatMapper with the index of subtask. */
+    public static class SubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 5273172591283191348L;
+
+        public static CountDownLatch workCompletedLatch = new CountDownLatch(1);
+        private static AtomicInteger completedCheckpointNum;
+
+        private transient ValueState<Integer> counter;
+        private transient ValueState<Integer> sum;
+
+        private final int numberElements;
+
+        public SubtaskIndexFlatMapper(int numberElements) {
+            this.numberElements = numberElements;
+            this.completedCheckpointNum = new AtomicInteger();
+        }
+
+        @Override
+        public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out)
+                throws Exception {
+
+            int count = counter.value() + 1;
+            counter.update(count);
+
+            int s = sum.value() + value;
+            sum.update(s);
+
+            if (count % numberElements == 0) {
+                out.collect(Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), s));
+                workCompletedLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            // all managed, nothing to do.
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            counter =
+                    context.getKeyedStateStore()
+                            .getState(new ValueStateDescriptor<>("counter", Integer.class, 0));
+            sum =
+                    context.getKeyedStateStore()
+                            .getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            completedCheckpointNum.getAndIncrement();
+        }
+    }
+
+    /** Sink for collecting results into a collection. */
+    public static class CollectionSink<IN> implements SinkFunction<IN> {
+
+        private static Set<Object> elements =
+                Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>());

Review comment:
       ```suggestion
           private static final Set<Object> elements =
                   Collections.newSetFromMap(new ConcurrentHashMap<>());
   ```

##########
File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
##########
@@ -139,4 +142,11 @@ private static boolean hasMetadata(Path file) {
             return false; // should never happen
         }
     }
+
+    public static void waitUntilCanceled(JobID jobId, ClusterClient<?> client)

Review comment:
       I think `waitUntilJobCanceled` looks better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
fredia commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1072267658


   We tested one scale-out(3->4) case,  `deleteRange` can reduce the  initialization time by 90% at least.
   
   <div class="lake-content" typography="classic">
   
     | parallelism before | parallelism after | checkpoint size | initialization time(ms)
   -- | -- | -- | -- | --
   delete | 3 | 4 | 282MB | 6935±995
   deleteRange | 3 | 4 | 282MB | 1130±523
   delete | 3 | 4 | 3.75GB | 32815±8992
   deleteRange | 3 | 4 | 3.75GB | 4019±618
   delete | 3 | 4 | 122.3GB | 1094358±316691
   deleteRange | 3 | 4 | 122.3GB | 123866±16329
   
   </div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d1dbec66de54a97fb4a1a9a9304806bbe4da9af UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d1dbec66de54a97fb4a1a9a9304806bbe4da9af Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807) 
   * 768a998a8989828f816cbdfa009737529ec9dbd0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 768a998a8989828f816cbdfa009737529ec9dbd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984) 
   * 697f9aad6e5d3a630ccfb16dd813cbb36666da52 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097) 
   * 28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aitozi commented on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1072973300


   @fredia Thanks for driving this useful improvement 👍🏻. Will we also tune the default value of `state.backend.rocksdb.restore-overlap-fraction-threshold` for the wider use case ? IMO, we could always choose one as the base db, since the `DeleteRange` can finish in no time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290",
       "triggerID" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2bc97da2f6306049c4c3a891866977132581ebbb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2bc97da2f6306049c4c3a891866977132581ebbb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208) 
   * f6c2933a9edaeefd01729d7325298141894b0f35 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290) 
   * 2bc97da2f6306049c4c3a891866977132581ebbb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c753f1d0d5d51551aa452f53f02affd02a6af04",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5c753f1d0d5d51551aa452f53f02affd02a6af04",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208) 
   * 5c753f1d0d5d51551aa452f53f02affd02a6af04 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208) 
   * f6c2933a9edaeefd01729d7325298141894b0f35 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290",
       "triggerID" : "f6c2933a9edaeefd01729d7325298141894b0f35",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2bc97da2f6306049c4c3a891866977132581ebbb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33328",
       "triggerID" : "2bc97da2f6306049c4c3a891866977132581ebbb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f6c2933a9edaeefd01729d7325298141894b0f35 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33290) 
   * 2bc97da2f6306049c4c3a891866977132581ebbb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33328) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d4fc20e109703ae85eb8a7786973824a8fb081ac Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207) 
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33097",
       "triggerID" : "697f9aad6e5d3a630ccfb16dd813cbb36666da52",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33103",
       "triggerID" : "28c2e4d372044c76392b3d3d8ec3c68f0cb6f6b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33207",
       "triggerID" : "d4fc20e109703ae85eb8a7786973824a8fb081ac",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208",
       "triggerID" : "dd92512b30f07e472d9e9e5377d0b4e801214d00",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd92512b30f07e472d9e9e5377d0b4e801214d00 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33208) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19033:
URL: https://github.com/apache/flink/pull/19033#issuecomment-1063699541


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32807",
       "triggerID" : "7d1dbec66de54a97fb4a1a9a9304806bbe4da9af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984",
       "triggerID" : "768a998a8989828f816cbdfa009737529ec9dbd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 768a998a8989828f816cbdfa009737529ec9dbd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32984) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org