You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/21 07:02:06 UTC

[GitHub] [flink] fredia opened a new pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

fredia opened a new pull request #19177:
URL: https://github.com/apache/flink/pull/19177


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *This pull request introduces benchmarks to evaluate the performance of rescaling.
   For more details please check [FLINK-23399](https://issues.apache.org/jira/browse/FLINK-23399).*
   
   
   ## Brief change log
     - *Introduces a benchmark that evaluates the performance of rescaling from state.*
   
   
   ## Verifying this change
   
   *This change added the unit test `RescalingBenchmarkTest`.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / *no* / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044",
       "triggerID" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6eae4e703ea4a777b1d0b92e8a6e81653c4e849a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6eae4e703ea4a777b1d0b92e8a6e81653c4e849a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1012ead6db20e77f4fa95ebcc92fd3f23e59cc11 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044) 
   * 6eae4e703ea4a777b1d0b92e8a6e81653c4e849a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044",
       "triggerID" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1012ead6db20e77f4fa95ebcc92fd3f23e59cc11 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87fd557372a0e20a36a6de740a0be0cde5201747 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465) 
   * 57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r837251670



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,

Review comment:
       I think we can make the parameters as three: `numberOfKeys`, `keyLen`, `valueLen`.
   We make each key is different with each other with given `keyLen`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r839261530



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // numberOfKeys, keyLen and valueLen are used to control the size of state.
+    private final int numberOfKeys;
+    private final int keyLen;
+    private final int valueLen;
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final boolean isSafeToReuseState;
+    private final Random random = new Random(0);
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberOfKeys,
+            final int kenLen,
+            final int valueLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final boolean isSafeToReuseState,

Review comment:
       After rethink of how `RescalingBenchmark` should work, I think we should leave how keys are generated for user who call this benchmark. Moreover, we should also let the `KeyedProcessFunction` which used to update state open for users. In other word, the `stateProcessFunction` should also be a parameter.
   Last but not least, since current `RescalingBenchmark` has too many parameters in constructor, we should better to use `RescalingBenchmarkBuilder` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044",
       "triggerID" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6eae4e703ea4a777b1d0b92e8a6e81653c4e849a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34073",
       "triggerID" : "6eae4e703ea4a777b1d0b92e8a6e81653c4e849a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1012ead6db20e77f4fa95ebcc92fd3f23e59cc11 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044) 
   * 6eae4e703ea4a777b1d0b92e8a6e81653c4e849a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34073) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87fd557372a0e20a36a6de740a0be0cde5201747 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465) 
   * 57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
fredia commented on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1085715866


   CI passed at https://dev.azure.com/fredia/flink/_build/results?buildId=150&view=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897) 
   * 19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka merged pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
Myasuka merged pull request #19177:
URL: https://github.com/apache/flink/pull/19177


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980) 
   * 1012ead6db20e77f4fa95ebcc92fd3f23e59cc11 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87fd557372a0e20a36a6de740a0be0cde5201747 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fredia commented on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
fredia commented on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073735240


   @Myasuka Would you please take a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r840197342



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close test harness for subtask. */
+    public void closeHarnessForSubtask() throws Exception {

Review comment:
       I think the method could be renamed to `closeOperator`, which looks better, as this method would not bind to the implementation.
   Remember to update related comments.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Supplier;
+
+/** Builder for rescalingBenchmark. */
+public class RescalingBenchmarkBuilder<KEY> {
+    private int maxParallelism = 128;
+    private int parallelismBefore = 2;
+    private int parallelismAfter = 1;
+    private int managedMemorySize = 512 * 1024 * 1024;
+    private StateBackend stateBackend = new EmbeddedRocksDBStateBackend();
+    private RescalingBenchmark.StreamRecordGenerator<KEY> streamRecordGenerator;
+    private Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;
+    private CheckpointStorageAccess checkpointStorageAccess;
+
+    public RescalingBenchmarkBuilder setMaxParallelism(int maxParallelism) {

Review comment:
       We should better return as `RescalingBenchmarkBuilder<KEY>`, the same for below methods.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);

Review comment:
       Actually, we can use `ThreadLocalRandom.current()` to replace this `random` here as we do not need to ensure the deterministic results.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new TestKeyedFunction())
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeHarnessForSubtask();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingIn() throws Exception {
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(2)
+                        .setParallelismAfter(1)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new TestKeyedFunction())
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeHarnessForSubtask();
+        benchmark.tearDown();
+    }
+
+    @NotThreadSafe

Review comment:
       Since we only generate and process the record one by one, why we have to add such annotation here?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */

Review comment:
       ```suggestion
   /** The benchmark of rescaling from checkpoint. */
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close test harness for subtask. */
+    public void closeHarnessForSubtask() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {

Review comment:
       Simliarly, I think the method could be renamed to `prepareStateForOperator`.
   Remember to update related comments.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new TestKeyedFunction())
+                        .build();

Review comment:
       ```suggestion
           RescalingBenchmark<Integer> benchmark =
                   new RescalingBenchmarkBuilder<Integer>()
                           .setMaxParallelism(128)
                           .setParallelismBefore(1)
                           .setParallelismAfter(2)
                           .setManagedMemorySize(512 * 1024 * 1024)
                           .setCheckpointStorageAccess(
                                   new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
                                           .createCheckpointStorage(new JobID()))
                           .setStateBackend(new EmbeddedRocksDBStateBackend(true))
                           .setStreamRecordGenerator(new IntegerRecordGenerator())
                           .setStateProcessFunctionSupplier(TestKeyedFunction::new)
                           .build();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852) 
   * 0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Myasuka commented on a change in pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r837027311



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state

Review comment:
       ```suggestion
       // number of keys sent by source, numberElements and wordLen are used
       // to control the size of state.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;
+
+        try (KeyedOneInputStreamOperatorTestHarness<String, String, Integer> harness =
+                getTestHarness(keySelector, maxParallelism, 1, 0)) {
+            harness.setStateBackend(stateBackend);
+            harness.open();
+            for (int i = 0; i < numberElements; i++) {
+                harness.processElement(new StreamRecord<>(covertToString(i, fatArray), 0));
+            }
+            snapshot = harness.snapshot(0, 1);
+
+            OperatorSubtaskState[] subtaskStates = new OperatorSubtaskState[parallelismBefore];
+            for (int i = 0; i < parallelismBefore; i++) {
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, parallelismBefore, i);
+
+                harnessBefore[i] =
+                        getTestHarness(keySelector, maxParallelism, parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].initializeState(subtaskState);
+                harnessBefore[i].open();
+                subtaskStates[i] = harnessBefore[i].snapshot(1, 2);
+            }
+            return AbstractStreamOperatorTestHarness.repackageState(subtaskStates);
+        } finally {
+            closeHarness(harnessBefore);
+        }
+    }
+
+    private String covertToString(int number, char[] fatArray) {
+        String a = String.valueOf(number);
+        StringBuilder builder = new StringBuilder(wordLen);
+        builder.append(a);
+        builder.append(fatArray, 0, wordLen - a.length());
+        return builder.toString();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, String, Integer> getTestHarness(
+            KeySelector<String, String> keySelector,
+            int maxParallelism,
+            int taskParallelism,
+            int subtaskIdx)
+            throws Exception {
+        MockEnvironment env =
+                new MockEnvironmentBuilder()
+                        .setTaskName("RescalingTask")
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setInputSplitProvider(new MockInputSplitProvider())
+                        .setBufferSize(1024)
+                        .setMaxParallelism(maxParallelism)
+                        .setParallelism(taskParallelism)
+                        .setSubtaskIndex(subtaskIdx)
+                        .build();
+        env.setCheckpointStorageAccess(checkpointStorageAccess);
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(new TestKeyedFunction()),
+                keySelector,
+                BasicTypeInfo.STRING_TYPE_INFO,
+                env);
+    }
+
+    private void closeHarness(KeyedOneInputStreamOperatorTestHarness<?, ?, ?>[] harnessArr)
+            throws Exception {
+        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : harnessArr) {
+            if (harness != null) {
+                harness.close();
+            }
+        }
+    }
+
+    private static class TestKeyedFunction extends KeyedProcessFunction<String, String, Integer> {

Review comment:
       This function lacks of a `serialVersionUID`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Test Rescaling benchmark. */
+@RunWith(Parameterized.class)
+public class RescalingBenchmarkTest extends TestLogger {
+    private final int numberElements = 1000;
+    private int wordLen = 32;
+    private int maxParallelism = 13;
+
+    @Parameterized.Parameters(name = "stateBackendType = {0}")
+    public static Object[] data() {
+        return new Object[] {"rocksdbIncremental"};
+    }
+
+    @Parameterized.Parameter public static String stateBackendType;
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingUp() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmark(
+                        numberElements,
+                        wordLen,
+                        3,
+                        4,
+                        maxParallelism,
+                        getStateBackend(stateBackendType),
+                        new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                .createCheckpointStorage(new JobID()));
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeSubtaskHarness();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingDown() throws Exception {
+        RescalingBenchmark benchmark =
+                new RescalingBenchmark(
+                        numberElements,
+                        wordLen,
+                        7,
+                        3,
+                        maxParallelism,
+                        getStateBackend(stateBackendType),
+                        new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                .createCheckpointStorage(new JobID()));
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeSubtaskHarness();
+        benchmark.tearDown();
+    }
+
+    private StateBackend getStateBackend(String stateBackendType) {
+        switch (stateBackendType) {
+            case "rocksdbIncremental":
+                return new EmbeddedRocksDBStateBackend(true);
+            case "rocksdb":
+                return new EmbeddedRocksDBStateBackend(false);
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown state backend type: " + stateBackendType);
+        }
+    }

Review comment:
       I think just one kind of state backend is enough.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);

Review comment:
       I think we should add a comment here to tell that the source is deterministic.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;
+
+        try (KeyedOneInputStreamOperatorTestHarness<String, String, Integer> harness =
+                getTestHarness(keySelector, maxParallelism, 1, 0)) {
+            harness.setStateBackend(stateBackend);
+            harness.open();
+            for (int i = 0; i < numberElements; i++) {
+                harness.processElement(new StreamRecord<>(covertToString(i, fatArray), 0));
+            }
+            snapshot = harness.snapshot(0, 1);
+
+            OperatorSubtaskState[] subtaskStates = new OperatorSubtaskState[parallelismBefore];
+            for (int i = 0; i < parallelismBefore; i++) {
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, parallelismBefore, i);
+
+                harnessBefore[i] =
+                        getTestHarness(keySelector, maxParallelism, parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].initializeState(subtaskState);
+                harnessBefore[i].open();
+                subtaskStates[i] = harnessBefore[i].snapshot(1, 2);
+            }
+            return AbstractStreamOperatorTestHarness.repackageState(subtaskStates);
+        } finally {
+            closeHarness(harnessBefore);
+        }
+    }
+
+    private String covertToString(int number, char[] fatArray) {
+        String a = String.valueOf(number);
+        StringBuilder builder = new StringBuilder(wordLen);
+        builder.append(a);
+        builder.append(fatArray, 0, wordLen - a.length());
+        return builder.toString();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, String, Integer> getTestHarness(

Review comment:
       Since we just need to fill up the state, I think we'd better to use `byte[]` as the key and value type for the state, as the `BytePrimitiveArraySerializer` would cost much less CPU compared with `StringSerializer`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */

Review comment:
       ```suggestion
        * rescaling on one subtask, this is the benchmark entrance.
        */
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;

Review comment:
       Some private fields could be `final`.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
##########
@@ -64,7 +64,13 @@ public MockStreamTaskBuilder(Environment environment) throws Exception {
         this.config = new StreamConfig(environment.getTaskConfiguration());
 
         MemoryStateBackend stateBackend = new MemoryStateBackend();
-        this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
+
+        try {
+            this.checkpointStorage = environment.getCheckpointStorageAccess();
+        } catch (Exception e) {
+            this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
+        }
+

Review comment:
       We don't need to change this as the `checkpointStorage` would be overridden in below setter.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;
+
+        try (KeyedOneInputStreamOperatorTestHarness<String, String, Integer> harness =
+                getTestHarness(keySelector, maxParallelism, 1, 0)) {
+            harness.setStateBackend(stateBackend);
+            harness.open();
+            for (int i = 0; i < numberElements; i++) {
+                harness.processElement(new StreamRecord<>(covertToString(i, fatArray), 0));
+            }
+            snapshot = harness.snapshot(0, 1);
+
+            OperatorSubtaskState[] subtaskStates = new OperatorSubtaskState[parallelismBefore];
+            for (int i = 0; i < parallelismBefore; i++) {
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, parallelismBefore, i);
+
+                harnessBefore[i] =
+                        getTestHarness(keySelector, maxParallelism, parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].initializeState(subtaskState);
+                harnessBefore[i].open();
+                subtaskStates[i] = harnessBefore[i].snapshot(1, 2);
+            }
+            return AbstractStreamOperatorTestHarness.repackageState(subtaskStates);
+        } finally {
+            closeHarness(harnessBefore);
+        }
+    }
+
+    private String covertToString(int number, char[] fatArray) {
+        String a = String.valueOf(number);
+        StringBuilder builder = new StringBuilder(wordLen);
+        builder.append(a);
+        builder.append(fatArray, 0, wordLen - a.length());
+        return builder.toString();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, String, Integer> getTestHarness(
+            KeySelector<String, String> keySelector,
+            int maxParallelism,
+            int taskParallelism,
+            int subtaskIdx)
+            throws Exception {
+        MockEnvironment env =
+                new MockEnvironmentBuilder()
+                        .setTaskName("RescalingTask")
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setInputSplitProvider(new MockInputSplitProvider())
+                        .setBufferSize(1024)
+                        .setMaxParallelism(maxParallelism)
+                        .setParallelism(taskParallelism)
+                        .setSubtaskIndex(subtaskIdx)
+                        .build();
+        env.setCheckpointStorageAccess(checkpointStorageAccess);
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(new TestKeyedFunction()),
+                keySelector,
+                BasicTypeInfo.STRING_TYPE_INFO,
+                env);
+    }
+
+    private void closeHarness(KeyedOneInputStreamOperatorTestHarness<?, ?, ?>[] harnessArr)
+            throws Exception {
+        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : harnessArr) {
+            if (harness != null) {
+                harness.close();
+            }
+        }
+    }
+
+    private static class TestKeyedFunction extends KeyedProcessFunction<String, String, Integer> {
+
+        private ValueState<Integer> counterState;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            counterState =
+                    this.getRuntimeContext()
+                            .getState(new ValueStateDescriptor<>("counter", Integer.class));
+        }
+
+        @Override
+        public void processElement(String value, Context ctx, Collector<Integer> out)
+                throws Exception {
+            Integer count = Integer.valueOf(1);
+            counterState.update(count);
+            out.collect(count);

Review comment:
       We can make the output type as `Void` to avoid the `out.collect`

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Test Rescaling benchmark. */
+@RunWith(Parameterized.class)
+public class RescalingBenchmarkTest extends TestLogger {
+    private final int numberElements = 1000;
+    private int wordLen = 32;
+    private int maxParallelism = 13;
+
+    @Parameterized.Parameters(name = "stateBackendType = {0}")
+    public static Object[] data() {
+        return new Object[] {"rocksdbIncremental"};
+    }
+
+    @Parameterized.Parameter public static String stateBackendType;
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingUp() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmark(
+                        numberElements,
+                        wordLen,
+                        3,
+                        4,
+                        maxParallelism,
+                        getStateBackend(stateBackendType),
+                        new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                .createCheckpointStorage(new JobID()));
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeSubtaskHarness();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingDown() throws Exception {

Review comment:
       It should be `testScaleIn`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();

Review comment:
       I think this is not good as the range of `char` is from 0 to 65536, while the range of integer could be much larger.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Test Rescaling benchmark. */
+@RunWith(Parameterized.class)
+public class RescalingBenchmarkTest extends TestLogger {
+    private final int numberElements = 1000;
+    private int wordLen = 32;
+    private int maxParallelism = 13;
+
+    @Parameterized.Parameters(name = "stateBackendType = {0}")
+    public static Object[] data() {
+        return new Object[] {"rocksdbIncremental"};
+    }
+
+    @Parameterized.Parameter public static String stateBackendType;
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingUp() throws Exception {

Review comment:
       It should be `testScaleOut`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;

Review comment:
       Why not just assign to `snapshot` within the try-block?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
##########
@@ -305,6 +305,12 @@ private AbstractStreamOperatorTestHarness(
 
         this.taskMailbox = new TaskMailboxImpl();
 
+        try {
+            this.checkpointStorageAccess = environment.getCheckpointStorageAccess();
+        } catch (Exception e) {
+            // do nothing, use default value
+        }
+

Review comment:
       ```suggestion
           // TODO remove this once we introduce AbstractStreamOperatorTestHarnessBuilder.
           try {
               this.checkpointStorageAccess = environment.getCheckpointStorageAccess();
           } catch (NullPointerException | UnsupportedOperationException e) {
               // cannot get checkpoint storage from environment, use default one.
           }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897) 
   * 19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87fd557372a0e20a36a6de740a0be0cde5201747 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87fd557372a0e20a36a6de740a0be0cde5201747 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e56195ded8b51d1362bff479d230dc6e641d9f21",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e56195ded8b51d1362bff479d230dc6e641d9f21",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852) 
   * e56195ded8b51d1362bff479d230dc6e641d9f21 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550) 
   * fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550) 
   * fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852) 
   * 0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19177:
URL: https://github.com/apache/flink/pull/19177#issuecomment-1073539671


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33465",
       "triggerID" : "87fd557372a0e20a36a6de740a0be0cde5201747",
       "triggerType" : "PUSH"
     }, {
       "hash" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33550",
       "triggerID" : "57894a9e0d0a2e3a9892f9c712b83a09aaf3ed42",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33852",
       "triggerID" : "fc94f6f4b2ea3beed5683b9fd44cc8364a1a5cd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33897",
       "triggerID" : "0ec5ceb959985fe813b47b7fbc0d3c5f9138b9ec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980",
       "triggerID" : "19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044",
       "triggerID" : "1012ead6db20e77f4fa95ebcc92fd3f23e59cc11",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19ebdf8168d4ff2980379f4bfab2f33e1aa7bcdd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33980) 
   * 1012ead6db20e77f4fa95ebcc92fd3f23e59cc11 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34044) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org