You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/11 09:13:29 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #19033: [FLINK-21321][Runtime/StateBackends] improve RocksDB incremental rescale performance by using deleteRange operator

Myasuka commented on a change in pull request #19033:
URL: https://github.com/apache/flink/pull/19033#discussion_r824522047



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -861,6 +1010,16 @@ public void initializeState(FunctionInitializationContext context) throws Except
                     context.getKeyedStateStore()
                             .getState(new ValueStateDescriptor<>("sum", Integer.class, 0));
         }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            completedCheckpointNum.getAndIncrement();
+        }
+
+        @Override
+        public void notifyCheckpointAborted(long checkpointId) throws Exception {
+            CheckpointListener.super.notifyCheckpointAborted(checkpointId);
+        }

Review comment:
       We don't need to implement this method.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -290,6 +301,139 @@ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMax
         }
     }
 
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
+            throws Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int numberElements2 = 500;
+        final int parallelism = scaleOut ? 3 : 4;
+        final int parallelism2 = scaleOut ? 4 : 3;
+        final int maxParallelism = 13;
+        Duration timeout = Duration.ofMinutes(3);
+        Deadline deadline = Deadline.now().plus(timeout);
+        ClusterClient<?> client = cluster.getClusterClient();
+        try {
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism, maxParallelism, numberKeys, numberElements, false, 100);
+            final JobID jobID = jobGraph.getJobID();
+            // make sure the job does not finish before we cancel it
+            StateSourceBase.canFinishLatch = new CountDownLatch(1);
+            client.submitJob(jobGraph).get();
+            // wait till the sources have emitted numberElements for each key
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            // verify the current state
+            Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, keyGroupIndex),
+                                numberElements * key));
+            }
+            assertEquals(expectedResult, actualResult);
+            // don't cancel source before we get a completed checkpoint
+            while (SubtaskIndexFlatMapper.completedCheckpointNum.get() / parallelism < 1) {
+                Thread.sleep(10);
+            }
+            String checkpointPath = getLastedCheckpointPath();
+            while (checkpointPath == null) {
+                Thread.sleep(10);
+                checkpointPath = getLastedCheckpointPath();
+            }
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+            StateSourceBase.canFinishLatch.countDown();
+            client.cancel(jobID).get();
+            while (!getRunningJobs(client).isEmpty()) {
+                Thread.sleep(50);
+            }
+            int restoreMaxParallelism =
+                    deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism;
+            JobGraph scaledJobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism2,
+                            restoreMaxParallelism,
+                            numberKeys,
+                            numberElements + numberElements2,
+                            true,
+                            100);
+            scaledJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(checkpointPath));
+            submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult2.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism2, keyGroupIndex),
+                                key * (numberElements + numberElements2)));
+            }
+            assertEquals(expectedResult2, actualResult2);
+        } finally {
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+        }
+    }
+
+    private String getLastedCheckpointPath() {
+        String checkpointPath = null;
+        int ckpId = 0;
+        for (File instanceFile : checkpointDir.listFiles()) {
+            for (File file : instanceFile.listFiles()) {
+                if (!file.getName().startsWith("chk-")) {
+                    continue;
+                }
+                boolean flag = false;

Review comment:
       I think a variable named `foundCheckpoint` looks better.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -95,23 +98,28 @@
     private static final int slotsPerTaskManager = 2;
     private static final int numSlots = numTaskManagers * slotsPerTaskManager;
 
-    @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}")
+    @Parameterized.Parameters(
+            name = "backend = {0}, buffersPerChannel = {1}, incrementalFlag = {2}")

Review comment:
       ```suggestion
               name = "backend = {0}, buffersPerChannel = {1}, incrementalCheckpoint = {2}")
   ```

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -290,6 +301,139 @@ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMax
         }
     }
 
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)

Review comment:
       I believe most of code could be shared with `testSavepointRescalingKeyedState`, let's simplify the logic.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
##########
@@ -290,6 +301,139 @@ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMax
         }
     }
 
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true, false);
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(false, true);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+        testCheckpointRescalingKeyedState(true, true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a checkpoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
+            throws Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int numberElements2 = 500;
+        final int parallelism = scaleOut ? 3 : 4;
+        final int parallelism2 = scaleOut ? 4 : 3;
+        final int maxParallelism = 13;
+        Duration timeout = Duration.ofMinutes(3);
+        Deadline deadline = Deadline.now().plus(timeout);
+        ClusterClient<?> client = cluster.getClusterClient();
+        try {
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism, maxParallelism, numberKeys, numberElements, false, 100);
+            final JobID jobID = jobGraph.getJobID();
+            // make sure the job does not finish before we cancel it
+            StateSourceBase.canFinishLatch = new CountDownLatch(1);
+            client.submitJob(jobGraph).get();
+            // wait till the sources have emitted numberElements for each key
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            // verify the current state
+            Set<Tuple2<Integer, Integer>> actualResult = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, keyGroupIndex),
+                                numberElements * key));
+            }
+            assertEquals(expectedResult, actualResult);
+            // don't cancel source before we get a completed checkpoint
+            while (SubtaskIndexFlatMapper.completedCheckpointNum.get() / parallelism < 1) {
+                Thread.sleep(10);
+            }
+            String checkpointPath = getLastedCheckpointPath();
+            while (checkpointPath == null) {
+                Thread.sleep(10);
+                checkpointPath = getLastedCheckpointPath();
+            }
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+            StateSourceBase.canFinishLatch.countDown();
+            client.cancel(jobID).get();
+            while (!getRunningJobs(client).isEmpty()) {
+                Thread.sleep(50);
+            }
+            int restoreMaxParallelism =
+                    deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism;
+            JobGraph scaledJobGraph =
+                    createJobGraphWithKeyedState(
+                            parallelism2,
+                            restoreMaxParallelism,
+                            numberKeys,
+                            numberElements + numberElements2,
+                            true,
+                            100);
+            scaledJobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(checkpointPath));
+            submitJobAndWaitForResult(client, scaledJobGraph, getClass().getClassLoader());
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
+            Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+                expectedResult2.add(
+                        Tuple2.of(
+                                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism2, keyGroupIndex),
+                                key * (numberElements + numberElements2)));
+            }
+            assertEquals(expectedResult2, actualResult2);
+        } finally {
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+        }
+    }
+
+    private String getLastedCheckpointPath() {
+        String checkpointPath = null;
+        int ckpId = 0;
+        for (File instanceFile : checkpointDir.listFiles()) {
+            for (File file : instanceFile.listFiles()) {
+                if (!file.getName().startsWith("chk-")) {
+                    continue;
+                }
+                boolean flag = false;
+                for (File child : file.listFiles()) {
+                    if (child.getName().endsWith("_metadata")) {

Review comment:
       We can change `_metadata` to `AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
##########
@@ -153,30 +149,15 @@ private static void deleteRange(
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
             byte[] beginKeyBytes,
-            byte[] endKeyBytes,
-            @Nonnegative long writeBatchSize)
+            byte[] endKeyBytes)
             throws RocksDBException {
 
         for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-            try (ReadOptions readOptions = new ReadOptions();
-                    RocksIteratorWrapper iteratorWrapper =
-                            RocksDBOperationUtils.getRocksIterator(
-                                    db, columnFamilyHandle, readOptions);
-                    RocksDBWriteBatchWrapper writeBatchWrapper =
-                            new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
-
-                iteratorWrapper.seek(beginKeyBytes);
-
-                while (iteratorWrapper.isValid()) {
-                    final byte[] currentKey = iteratorWrapper.key();
-                    if (beforeThePrefixBytes(currentKey, endKeyBytes)) {
-                        writeBatchWrapper.remove(columnFamilyHandle, currentKey);
-                    } else {
-                        break;
-                    }
-                    iteratorWrapper.next();
-                }
-            }
+            // Using RocksDB's deleteRange will take advantage of delete
+            // tombstones, which mark the range as deleted.
+            //
+            // https://github.com/facebook/rocksdb/blob/bcd32560dd5898956b9d24553c2bb3c1b1d2319f/include/rocksdb/db.h#L357-L371

Review comment:
       Maybe we could point to the FRocksDB link.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org