You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/03/21 03:11:07 UTC

[flink] 01/02: [FLINK-21321]: change RocksDB rescale to use deleteRange

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6822f293e24263752419e49f8b7910f4e0464a8
Author: Joey Pereira <jo...@pereira.io>
AuthorDate: Tue May 26 08:26:58 2020 -0400

    [FLINK-21321]: change RocksDB rescale to use deleteRange
    
    Previously, the Flink incremental checkpoint restore operation would
    scan and delete individual keys during recovery when rescaling. This is
    done to truncate the ranges of the checkpoints which are no longer part
    of the assigned key-range for a worker.
    
    Now, this operation is replaced and uses RocksDB's deleteRange
    operation. This operation is preferred because it can cheaply remove
    data, via tombstones.
    
    The RocksDB API for DeleteRange is here,
    https://github.com/facebook/rocksdb/blob/bcd32560dd5898956b9d24553c2bb3c1b1d2319f/include/rocksdb/db.h#L357-L373
    
    Tombstones are described in further detail here,
    https://rocksdb.org/blog/2018/11/21/delete-range.html
    
    Additionally, this adds a benchmark test based on
    RocksIncrementalCheckpointRescalingTest which triggers the modified
    re-scaling code.
---
 .../state/RocksDBIncrementalCheckpointUtils.java   |  37 +---
 .../RocksDBIncrementalRestoreOperation.java        |   3 +-
 .../RocksDBIncrementalCheckpointUtilsTest.java     |   3 +-
 ...ncrementalCheckpointRescalingBenchmarkTest.java | 240 +++++++++++++++++++++
 4 files changed, 251 insertions(+), 32 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 4f73ff8..467156c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -108,8 +107,7 @@ public class RocksDBIncrementalCheckpointUtils {
             @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
             @Nonnull KeyGroupRange targetKeyGroupRange,
             @Nonnull KeyGroupRange currentKeyGroupRange,
-            @Nonnegative int keyGroupPrefixBytes,
-            @Nonnegative long writeBatchSize)
+            @Nonnegative int keyGroupPrefixBytes)
             throws RocksDBException {
 
         final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
@@ -120,8 +118,7 @@ public class RocksDBIncrementalCheckpointUtils {
                     currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
             CompositeKeySerializationUtils.serializeKeyGroup(
                     targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
-            deleteRange(
-                    db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
+            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
         }
 
         if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
@@ -129,8 +126,7 @@ public class RocksDBIncrementalCheckpointUtils {
                     targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
             CompositeKeySerializationUtils.serializeKeyGroup(
                     currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
-            deleteRange(
-                    db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes, writeBatchSize);
+            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
         }
     }
 
@@ -146,30 +142,15 @@ public class RocksDBIncrementalCheckpointUtils {
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
             byte[] beginKeyBytes,
-            byte[] endKeyBytes,
-            @Nonnegative long writeBatchSize)
+            byte[] endKeyBytes)
             throws RocksDBException {
 
         for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-            try (ReadOptions readOptions = new ReadOptions();
-                    RocksIteratorWrapper iteratorWrapper =
-                            RocksDBOperationUtils.getRocksIterator(
-                                    db, columnFamilyHandle, readOptions);
-                    RocksDBWriteBatchWrapper writeBatchWrapper =
-                            new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
-
-                iteratorWrapper.seek(beginKeyBytes);
-
-                while (iteratorWrapper.isValid()) {
-                    final byte[] currentKey = iteratorWrapper.key();
-                    if (beforeThePrefixBytes(currentKey, endKeyBytes)) {
-                        writeBatchWrapper.remove(columnFamilyHandle, currentKey);
-                    } else {
-                        break;
-                    }
-                    iteratorWrapper.next();
-                }
-            }
+            // Using RocksDB's deleteRange will take advantage of delete
+            // tombstones, which mark the range as deleted.
+            //
+            // https://github.com/facebook/rocksdb/blob/bcd32560dd5898956b9d24553c2bb3c1b1d2319f/include/rocksdb/db.h#L357-L371
+            db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
         }
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 657b6f2..d6ec9ae6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -390,8 +390,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper
                     this.rocksHandle.getColumnFamilyHandles(),
                     keyGroupRange,
                     initialHandle.getKeyGroupRange(),
-                    keyGroupPrefixBytes,
-                    writeBatchSize);
+                    keyGroupPrefixBytes);
         } catch (RocksDBException e) {
             String errMsg = "Failed to clip DB after initialization.";
             logger.error(errMsg, e);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index ff6d854..2de72a7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -185,8 +185,7 @@ public class RocksDBIncrementalCheckpointUtilsTest extends TestLogger {
                     Collections.singletonList(columnFamilyHandle),
                     targetGroupRange,
                     currentGroupRange,
-                    keyGroupPrefixBytes,
-                    RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes());
+                    keyGroupPrefixBytes);
 
             for (int i = currentGroupRangeStart; i <= currentGroupRangeEnd; ++i) {
                 for (int j = 0; j < 100; ++j) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingBenchmarkTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingBenchmarkTest.java
new file mode 100644
index 0000000..a4e267b
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingBenchmarkTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.List;
+
+/** Test runs the benchmark for incremental checkpoint rescaling. */
+public class RocksIncrementalCheckpointRescalingBenchmarkTest extends TestLogger {
+
+    @Rule public TemporaryFolder rootFolder = new TemporaryFolder();
+
+    private static final int maxParallelism = 10;
+
+    private static final int recordCount = 1_000;
+
+    /** partitionParallelism is the parallelism to use for creating the partitionedSnapshot. */
+    private static final int partitionParallelism = 2;
+
+    /**
+     * repartitionParallelism is the parallelism to use during the test for the repartition step.
+     *
+     * <p>NOTE: To trigger {@link
+     * org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation#restoreWithRescaling(Collection)},
+     * where the improvement code is exercised, the target parallelism must not be divisible by
+     * {@link partitionParallelism}. If this parallelism was instead 4, then there is no rescale.
+     */
+    private static final int repartitionParallelism = 3;
+
+    /** partitionedSnapshot is a partitioned incremental RocksDB snapshot. */
+    private OperatorSubtaskState partitionedSnapshot;
+
+    private KeySelector<Integer, Integer> keySelector = new TestKeySelector();
+
+    /**
+     * The benchmark's preparation will:
+     *
+     * <ol>
+     *   <li>Create a stateful operator and process records to persist state.
+     *   <li>Snapshot the state and re-partition it so the test operates on a partitioned state.
+     * </ol>
+     *
+     * @throws Exception
+     */
+    @Before
+    public void before() throws Exception {
+        OperatorSubtaskState snapshot;
+        // Initialize the test harness with a a task parallelism of 1.
+        try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> harness =
+                getHarnessTest(keySelector, maxParallelism, 1, 0)) {
+            // Set the state backend of the harness to RocksDB.
+            harness.setStateBackend(getStateBackend());
+            // Initialize the harness.
+            harness.open();
+            // Push the test records into the operator to trigger state updates.
+            Integer[] records = new Integer[recordCount];
+            for (int i = 0; i < recordCount; i++) {
+                harness.processElement(new StreamRecord<>(i, 1));
+            }
+            // Grab a snapshot of the state.
+            snapshot = harness.snapshot(0, 0);
+        }
+
+        // Now, re-partition to create a partitioned state.
+        KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>[] partitionedTestHarness =
+                new KeyedOneInputStreamOperatorTestHarness[partitionParallelism];
+        List<KeyGroupRange> keyGroupPartitions =
+                StateAssignmentOperation.createKeyGroupPartitions(
+                        maxParallelism, partitionParallelism);
+        try {
+            for (int i = 0; i < partitionParallelism; i++) {
+                // Initialize, open, and then re-snapshot the two subtasks to create a partitioned
+                // incremental RocksDB snapshot.
+                OperatorSubtaskState subtaskState =
+                        AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                                snapshot, maxParallelism, 1, partitionParallelism, i);
+                KeyGroupRange localKeyGroupRange20 = keyGroupPartitions.get(i);
+
+                partitionedTestHarness[i] =
+                        getHarnessTest(keySelector, maxParallelism, partitionParallelism, i);
+                partitionedTestHarness[i].setStateBackend(getStateBackend());
+                partitionedTestHarness[i].setup();
+                partitionedTestHarness[i].initializeState(subtaskState);
+                partitionedTestHarness[i].open();
+            }
+
+            partitionedSnapshot =
+                    AbstractStreamOperatorTestHarness.repackageState(
+                            partitionedTestHarness[0].snapshot(1, 2),
+                            partitionedTestHarness[1].snapshot(1, 2));
+
+        } finally {
+            closeHarness(partitionedTestHarness);
+        }
+    }
+
+    @Test(timeout = 1000)
+    @RetryOnFailure(times = 3)
+    public void benchmarkScalingUp() throws Exception {
+        long benchmarkTime = 0;
+
+        // Trigger the incremental re-scaling via restoreWithRescaling by repartitioning it from
+        // parallelism of >1 to a higher parallelism. Time spent during this step includes the cost
+        // of incremental rescaling.
+        List<KeyGroupRange> keyGroupPartitions =
+                StateAssignmentOperation.createKeyGroupPartitions(
+                        maxParallelism, repartitionParallelism);
+
+        long fullStateSize = partitionedSnapshot.getStateSize();
+
+        for (int i = 0; i < repartitionParallelism; i++) {
+            OperatorSubtaskState subtaskState =
+                    AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                            partitionedSnapshot,
+                            maxParallelism,
+                            partitionParallelism,
+                            repartitionParallelism,
+                            i);
+            KeyGroupRange localKeyGroupRange20 = keyGroupPartitions.get(i);
+
+            try (KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> subtaskHarness =
+                    getHarnessTest(keySelector, maxParallelism, repartitionParallelism, i)) {
+                RocksDBStateBackend backend = getStateBackend();
+                subtaskHarness.setStateBackend(backend);
+                subtaskHarness.setup();
+
+                // Precisely measure the call-site that triggers the restore operation.
+                long startingTime = System.nanoTime();
+                subtaskHarness.initializeState(subtaskState);
+                benchmarkTime += System.nanoTime() - startingTime;
+            }
+        }
+
+        log.error(
+                "--------------> performance for incremental checkpoint re-scaling <--------------");
+        log.error(
+                "rescale from {} to {} with {} records took: {} nanoseconds",
+                partitionParallelism,
+                repartitionParallelism,
+                recordCount,
+                benchmarkTime);
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> getHarnessTest(
+            KeySelector<Integer, Integer> keySelector,
+            int maxParallelism,
+            int taskParallelism,
+            int subtaskIdx)
+            throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(new TestKeyedFunction()),
+                keySelector,
+                BasicTypeInfo.INT_TYPE_INFO,
+                maxParallelism,
+                taskParallelism,
+                subtaskIdx);
+    }
+
+    private void closeHarness(KeyedOneInputStreamOperatorTestHarness<?, ?, ?>[] harnessArr)
+            throws Exception {
+        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : harnessArr) {
+            if (harness != null) {
+                harness.close();
+            }
+        }
+    }
+
+    private RocksDBStateBackend getStateBackend() throws Exception {
+        return new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true);
+    }
+
+    /** A simple keyed function for tests. */
+    private class TestKeyedFunction extends KeyedProcessFunction<Integer, Integer, Integer> {
+
+        public ValueStateDescriptor<Integer> stateDescriptor;
+        private ValueState<Integer> counterState;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            stateDescriptor = new ValueStateDescriptor<Integer>("counter", Integer.class);
+            counterState = this.getRuntimeContext().getState(stateDescriptor);
+        }
+
+        @Override
+        public void processElement(Integer incomingValue, Context ctx, Collector<Integer> out)
+                throws Exception {
+            Integer oldValue = counterState.value();
+            Integer newValue = oldValue != null ? oldValue + incomingValue : incomingValue;
+            counterState.update(newValue);
+            out.collect(newValue);
+        }
+    }
+
+    /** A simple key selector for tests. */
+    private class TestKeySelector implements KeySelector<Integer, Integer> {
+        @Override
+        public Integer getKey(Integer value) throws Exception {
+            return value;
+        }
+    }
+}