You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/29 07:08:18 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r837027311



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state

Review comment:
       ```suggestion
       // number of keys sent by source, numberElements and wordLen are used
       // to control the size of state.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;
+
+        try (KeyedOneInputStreamOperatorTestHarness<String, String, Integer> harness =
+                getTestHarness(keySelector, maxParallelism, 1, 0)) {
+            harness.setStateBackend(stateBackend);
+            harness.open();
+            for (int i = 0; i < numberElements; i++) {
+                harness.processElement(new StreamRecord<>(covertToString(i, fatArray), 0));
+            }
+            snapshot = harness.snapshot(0, 1);
+
+            OperatorSubtaskState[] subtaskStates = new OperatorSubtaskState[parallelismBefore];
+            for (int i = 0; i < parallelismBefore; i++) {
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, parallelismBefore, i);
+
+                harnessBefore[i] =
+                        getTestHarness(keySelector, maxParallelism, parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].initializeState(subtaskState);
+                harnessBefore[i].open();
+                subtaskStates[i] = harnessBefore[i].snapshot(1, 2);
+            }
+            return AbstractStreamOperatorTestHarness.repackageState(subtaskStates);
+        } finally {
+            closeHarness(harnessBefore);
+        }
+    }
+
+    private String covertToString(int number, char[] fatArray) {
+        String a = String.valueOf(number);
+        StringBuilder builder = new StringBuilder(wordLen);
+        builder.append(a);
+        builder.append(fatArray, 0, wordLen - a.length());
+        return builder.toString();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, String, Integer> getTestHarness(
+            KeySelector<String, String> keySelector,
+            int maxParallelism,
+            int taskParallelism,
+            int subtaskIdx)
+            throws Exception {
+        MockEnvironment env =
+                new MockEnvironmentBuilder()
+                        .setTaskName("RescalingTask")
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setInputSplitProvider(new MockInputSplitProvider())
+                        .setBufferSize(1024)
+                        .setMaxParallelism(maxParallelism)
+                        .setParallelism(taskParallelism)
+                        .setSubtaskIndex(subtaskIdx)
+                        .build();
+        env.setCheckpointStorageAccess(checkpointStorageAccess);
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(new TestKeyedFunction()),
+                keySelector,
+                BasicTypeInfo.STRING_TYPE_INFO,
+                env);
+    }
+
+    private void closeHarness(KeyedOneInputStreamOperatorTestHarness<?, ?, ?>[] harnessArr)
+            throws Exception {
+        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : harnessArr) {
+            if (harness != null) {
+                harness.close();
+            }
+        }
+    }
+
+    private static class TestKeyedFunction extends KeyedProcessFunction<String, String, Integer> {

Review comment:
       This function lacks of a `serialVersionUID`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Test Rescaling benchmark. */
+@RunWith(Parameterized.class)
+public class RescalingBenchmarkTest extends TestLogger {
+    private final int numberElements = 1000;
+    private int wordLen = 32;
+    private int maxParallelism = 13;
+
+    @Parameterized.Parameters(name = "stateBackendType = {0}")
+    public static Object[] data() {
+        return new Object[] {"rocksdbIncremental"};
+    }
+
+    @Parameterized.Parameter public static String stateBackendType;
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingUp() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmark(
+                        numberElements,
+                        wordLen,
+                        3,
+                        4,
+                        maxParallelism,
+                        getStateBackend(stateBackendType),
+                        new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                .createCheckpointStorage(new JobID()));
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeSubtaskHarness();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingDown() throws Exception {
+        RescalingBenchmark benchmark =
+                new RescalingBenchmark(
+                        numberElements,
+                        wordLen,
+                        7,
+                        3,
+                        maxParallelism,
+                        getStateBackend(stateBackendType),
+                        new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                .createCheckpointStorage(new JobID()));
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeSubtaskHarness();
+        benchmark.tearDown();
+    }
+
+    private StateBackend getStateBackend(String stateBackendType) {
+        switch (stateBackendType) {
+            case "rocksdbIncremental":
+                return new EmbeddedRocksDBStateBackend(true);
+            case "rocksdb":
+                return new EmbeddedRocksDBStateBackend(false);
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown state backend type: " + stateBackendType);
+        }
+    }

Review comment:
       I think just one kind of state backend is enough.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);

Review comment:
       I think we should add a comment here to tell that the source is deterministic.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;
+
+        try (KeyedOneInputStreamOperatorTestHarness<String, String, Integer> harness =
+                getTestHarness(keySelector, maxParallelism, 1, 0)) {
+            harness.setStateBackend(stateBackend);
+            harness.open();
+            for (int i = 0; i < numberElements; i++) {
+                harness.processElement(new StreamRecord<>(covertToString(i, fatArray), 0));
+            }
+            snapshot = harness.snapshot(0, 1);
+
+            OperatorSubtaskState[] subtaskStates = new OperatorSubtaskState[parallelismBefore];
+            for (int i = 0; i < parallelismBefore; i++) {
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, parallelismBefore, i);
+
+                harnessBefore[i] =
+                        getTestHarness(keySelector, maxParallelism, parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].initializeState(subtaskState);
+                harnessBefore[i].open();
+                subtaskStates[i] = harnessBefore[i].snapshot(1, 2);
+            }
+            return AbstractStreamOperatorTestHarness.repackageState(subtaskStates);
+        } finally {
+            closeHarness(harnessBefore);
+        }
+    }
+
+    private String covertToString(int number, char[] fatArray) {
+        String a = String.valueOf(number);
+        StringBuilder builder = new StringBuilder(wordLen);
+        builder.append(a);
+        builder.append(fatArray, 0, wordLen - a.length());
+        return builder.toString();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, String, Integer> getTestHarness(

Review comment:
       Since we just need to fill up the state, I think we'd better to use `byte[]` as the key and value type for the state, as the `BytePrimitiveArraySerializer` would cost much less CPU compared with `StringSerializer`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */

Review comment:
       ```suggestion
        * rescaling on one subtask, this is the benchmark entrance.
        */
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;

Review comment:
       Some private fields could be `final`.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
##########
@@ -64,7 +64,13 @@ public MockStreamTaskBuilder(Environment environment) throws Exception {
         this.config = new StreamConfig(environment.getTaskConfiguration());
 
         MemoryStateBackend stateBackend = new MemoryStateBackend();
-        this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
+
+        try {
+            this.checkpointStorage = environment.getCheckpointStorageAccess();
+        } catch (Exception e) {
+            this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
+        }
+

Review comment:
       We don't need to change this as the `checkpointStorage` would be overridden in below setter.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;
+
+        try (KeyedOneInputStreamOperatorTestHarness<String, String, Integer> harness =
+                getTestHarness(keySelector, maxParallelism, 1, 0)) {
+            harness.setStateBackend(stateBackend);
+            harness.open();
+            for (int i = 0; i < numberElements; i++) {
+                harness.processElement(new StreamRecord<>(covertToString(i, fatArray), 0));
+            }
+            snapshot = harness.snapshot(0, 1);
+
+            OperatorSubtaskState[] subtaskStates = new OperatorSubtaskState[parallelismBefore];
+            for (int i = 0; i < parallelismBefore; i++) {
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, parallelismBefore, i);
+
+                harnessBefore[i] =
+                        getTestHarness(keySelector, maxParallelism, parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].initializeState(subtaskState);
+                harnessBefore[i].open();
+                subtaskStates[i] = harnessBefore[i].snapshot(1, 2);
+            }
+            return AbstractStreamOperatorTestHarness.repackageState(subtaskStates);
+        } finally {
+            closeHarness(harnessBefore);
+        }
+    }
+
+    private String covertToString(int number, char[] fatArray) {
+        String a = String.valueOf(number);
+        StringBuilder builder = new StringBuilder(wordLen);
+        builder.append(a);
+        builder.append(fatArray, 0, wordLen - a.length());
+        return builder.toString();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, String, Integer> getTestHarness(
+            KeySelector<String, String> keySelector,
+            int maxParallelism,
+            int taskParallelism,
+            int subtaskIdx)
+            throws Exception {
+        MockEnvironment env =
+                new MockEnvironmentBuilder()
+                        .setTaskName("RescalingTask")
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setInputSplitProvider(new MockInputSplitProvider())
+                        .setBufferSize(1024)
+                        .setMaxParallelism(maxParallelism)
+                        .setParallelism(taskParallelism)
+                        .setSubtaskIndex(subtaskIdx)
+                        .build();
+        env.setCheckpointStorageAccess(checkpointStorageAccess);
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(new TestKeyedFunction()),
+                keySelector,
+                BasicTypeInfo.STRING_TYPE_INFO,
+                env);
+    }
+
+    private void closeHarness(KeyedOneInputStreamOperatorTestHarness<?, ?, ?>[] harnessArr)
+            throws Exception {
+        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : harnessArr) {
+            if (harness != null) {
+                harness.close();
+            }
+        }
+    }
+
+    private static class TestKeyedFunction extends KeyedProcessFunction<String, String, Integer> {
+
+        private ValueState<Integer> counterState;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            counterState =
+                    this.getRuntimeContext()
+                            .getState(new ValueStateDescriptor<>("counter", Integer.class));
+        }
+
+        @Override
+        public void processElement(String value, Context ctx, Collector<Integer> out)
+                throws Exception {
+            Integer count = Integer.valueOf(1);
+            counterState.update(count);
+            out.collect(count);

Review comment:
       We can make the output type as `Void` to avoid the `out.collect`

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Test Rescaling benchmark. */
+@RunWith(Parameterized.class)
+public class RescalingBenchmarkTest extends TestLogger {
+    private final int numberElements = 1000;
+    private int wordLen = 32;
+    private int maxParallelism = 13;
+
+    @Parameterized.Parameters(name = "stateBackendType = {0}")
+    public static Object[] data() {
+        return new Object[] {"rocksdbIncremental"};
+    }
+
+    @Parameterized.Parameter public static String stateBackendType;
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingUp() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmark(
+                        numberElements,
+                        wordLen,
+                        3,
+                        4,
+                        maxParallelism,
+                        getStateBackend(stateBackendType),
+                        new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                .createCheckpointStorage(new JobID()));
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeSubtaskHarness();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingDown() throws Exception {

Review comment:
       It should be `testScaleIn`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();

Review comment:
       I think this is not good as the range of `char` is from 0 to 65536, while the range of integer could be much larger.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Test Rescaling benchmark. */
+@RunWith(Parameterized.class)
+public class RescalingBenchmarkTest extends TestLogger {
+    private final int numberElements = 1000;
+    private int wordLen = 32;
+    private int maxParallelism = 13;
+
+    @Parameterized.Parameters(name = "stateBackendType = {0}")
+    public static Object[] data() {
+        return new Object[] {"rocksdbIncremental"};
+    }
+
+    @Parameterized.Parameter public static String stateBackendType;
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingUp() throws Exception {

Review comment:
       It should be `testScaleOut`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.Random;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark {
+    // number of keys send by source, numberElements adn wordLen are used to control the size of
+    // state
+    private int numberElements;
+    private int wordLen; // length of key
+    private int maxParallelism;
+
+    private int parallelismBefore; // before rescaling
+    private int parallelismAfter; // after rescaling
+    private StateBackend stateBackend;
+    private CheckpointStorageAccess checkpointStorageAccess;
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState subtaskState;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    public RescalingBenchmark(
+            final int numberElements,
+            final int wordLen,
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess) {
+        this.numberElements = numberElements;
+        this.wordLen = wordLen;
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /**
+     * rescaling on one subtask.
+     *
+     * @throws Exception
+     */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(subtaskState);
+    }
+
+    /** close harness for subtask. */
+    public void closeSubtaskHarness() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {
+        subtaskState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness =
+                getTestHarness(value -> value, maxParallelism, parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+        char[] fatArray = new char[wordLen];
+        Random random = new Random(0);
+        for (int i = 0; i < fatArray.length; i++) {
+            fatArray[i] = (char) random.nextInt();
+        }
+
+        KeySelector<String, String> keySelector = value -> value;
+        KeyedOneInputStreamOperatorTestHarness<String, String, Integer>[] harnessBefore =
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        OperatorSubtaskState snapshot = null;

Review comment:
       Why not just assign to `snapshot` within the try-block?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
##########
@@ -305,6 +305,12 @@ private AbstractStreamOperatorTestHarness(
 
         this.taskMailbox = new TaskMailboxImpl();
 
+        try {
+            this.checkpointStorageAccess = environment.getCheckpointStorageAccess();
+        } catch (Exception e) {
+            // do nothing, use default value
+        }
+

Review comment:
       ```suggestion
           // TODO remove this once we introduce AbstractStreamOperatorTestHarnessBuilder.
           try {
               this.checkpointStorageAccess = environment.getCheckpointStorageAccess();
           } catch (NullPointerException | UnsupportedOperationException e) {
               // cannot get checkpoint storage from environment, use default one.
           }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org