You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/01 03:17:42 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #19177: [FLINK-23399][state] Add a benchmark for rescaling

Myasuka commented on a change in pull request #19177:
URL: https://github.com/apache/flink/pull/19177#discussion_r840197342



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close test harness for subtask. */
+    public void closeHarnessForSubtask() throws Exception {

Review comment:
       I think the method could be renamed to `closeOperator`, which looks better, as this method would not bind to the implementation.
   Remember to update related comments.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Supplier;
+
+/** Builder for rescalingBenchmark. */
+public class RescalingBenchmarkBuilder<KEY> {
+    private int maxParallelism = 128;
+    private int parallelismBefore = 2;
+    private int parallelismAfter = 1;
+    private int managedMemorySize = 512 * 1024 * 1024;
+    private StateBackend stateBackend = new EmbeddedRocksDBStateBackend();
+    private RescalingBenchmark.StreamRecordGenerator<KEY> streamRecordGenerator;
+    private Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;
+    private CheckpointStorageAccess checkpointStorageAccess;
+
+    public RescalingBenchmarkBuilder setMaxParallelism(int maxParallelism) {

Review comment:
       We should better return as `RescalingBenchmarkBuilder<KEY>`, the same for below methods.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);

Review comment:
       Actually, we can use `ThreadLocalRandom.current()` to replace this `random` here as we do not need to ensure the deterministic results.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new TestKeyedFunction())
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeHarnessForSubtask();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingIn() throws Exception {
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(2)
+                        .setParallelismAfter(1)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new TestKeyedFunction())
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateAndHarnessForSubtask(0);
+        benchmark.rescale();
+        benchmark.closeHarnessForSubtask();
+        benchmark.tearDown();
+    }
+
+    @NotThreadSafe

Review comment:
       Since we only generate and process the record one by one, why we have to add such annotation here?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */

Review comment:
       ```suggestion
   /** The benchmark of rescaling from checkpoint. */
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from savepoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close test harness for subtask. */
+    public void closeHarnessForSubtask() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state and harness for subtask. */
+    public void prepareStateAndHarnessForSubtask(int subtaskIndex) throws Exception {

Review comment:
       Simliarly, I think the method could be renamed to `prepareStateForOperator`.
   Remember to update related comments.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    private static final Random random = new Random(0);
+    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark benchmark =
+                new RescalingBenchmarkBuilder()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        .setStateProcessFunctionSupplier(() -> new TestKeyedFunction())
+                        .build();

Review comment:
       ```suggestion
           RescalingBenchmark<Integer> benchmark =
                   new RescalingBenchmarkBuilder<Integer>()
                           .setMaxParallelism(128)
                           .setParallelismBefore(1)
                           .setParallelismAfter(2)
                           .setManagedMemorySize(512 * 1024 * 1024)
                           .setCheckpointStorageAccess(
                                   new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
                                           .createCheckpointStorage(new JobID()))
                           .setStateBackend(new EmbeddedRocksDBStateBackend(true))
                           .setStreamRecordGenerator(new IntegerRecordGenerator())
                           .setStateProcessFunctionSupplier(TestKeyedFunction::new)
                           .build();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org