You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/06/29 14:09:13 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

tkalkirill opened a new pull request, #907:
URL: https://github.com/apache/ignite-3/pull/907

   https://issues.apache.org/jira/browse/IGNITE-17267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914808885


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914873743


##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.QueueResult;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointDirtyPages} testing.
+ */
+public class CheckpointDirtyPagesTest {
+    @Test
+    void testDirtyPagesCount() {
+        var dirtyPages0 = createDirtyPages(of(0, 0, 0), of(0, 0, 1));

Review Comment:
   You're right, I wanted to shorten the string length.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914824876


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int index;
+
+        /** Starting position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param index Element index in {@link CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page within the element at {@link #index}.
+         * @param toPosition End position (inclusive) of the dirty page within the element at {@link #index}.
+         */
+        private CheckpointDirtyPagesView(int index, int fromPosition, int toPosition) {
+            this.index = index;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {
+            return dirtyPages.get(this.index).getValue().get(fromPosition + index);
+        }
+
+        /**
+         * Returns the page memory for view.
+         */
+        public PersistentPageMemory pageMemory() {
+            return dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns the size of the view.
+         */
+        public int size() {
+            return toPosition - fromPosition + 1;
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+
+        private boolean isToPositionLast() {
+            return toPosition == dirtyPages.get(index).getValue().size() - 1;
+        }
+    }
+
+    /**
+     * Holder is the result of getting the next dirty page in {@link CheckpointDirtyPagesQueue#next(QueueResult)}.
+     *
+     * <p>Not thread safe.
+     */
+    static class QueueResult {
+        private @Nullable CheckpointDirtyPagesQueue owner;
+
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private int index;
+
+        /** Position of the dirty page within the element at {@link #index}. */
+        private int position;
+
+        /**
+         * Returns the page memory for the associated dirty page.
+         */
+        public @Nullable PersistentPageMemory pageMemory() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns dirty page.
+         */
+        public @Nullable FullPageId dirtyPage() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getValue().get(position);
+        }
+    }
+
+    private static boolean equalsByGroupAndPartition(FullPageId pageId0, FullPageId pageId1) {
+        return pageId0.groupId() == pageId1.groupId() && partitionId(pageId0.pageId()) == partitionId(pageId1.pageId());

Review Comment:
   It makes sense, I'll do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r913602644


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java:
##########
@@ -17,36 +17,33 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-
 /**
  * Data class of checkpoint information.
  */
 class Checkpoint {
-    /** Checkpoint pages. */
-    final IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> dirtyPages;
+    /** orted dirty pages from data regions that should be checkpointed. */
+    final CheckpointDirtyPages dirtyPages;
 
     /** Checkpoint progress status. */
     final CheckpointProgressImpl progress;
 
     /** Number of dirty pages. */
-    final int dirtyPagesSize;
+    final long dirtyPagesSize;

Review Comment:
   Really?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914792899


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914858687


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int index;
+
+        /** Starting position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param index Element index in {@link CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page within the element at {@link #index}.
+         * @param toPosition End position (inclusive) of the dirty page within the element at {@link #index}.
+         */
+        private CheckpointDirtyPagesView(int index, int fromPosition, int toPosition) {
+            this.index = index;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {
+            return dirtyPages.get(this.index).getValue().get(fromPosition + index);
+        }
+
+        /**
+         * Returns the page memory for view.
+         */
+        public PersistentPageMemory pageMemory() {
+            return dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns the size of the view.
+         */
+        public int size() {
+            return toPosition - fromPosition + 1;
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+
+        private boolean isToPositionLast() {
+            return toPosition == dirtyPages.get(index).getValue().size() - 1;
+        }
+    }
+
+    /**
+     * Holder is the result of getting the next dirty page in {@link CheckpointDirtyPagesQueue#next(QueueResult)}.
+     *
+     * <p>Not thread safe.
+     */
+    static class QueueResult {
+        private @Nullable CheckpointDirtyPagesQueue owner;
+
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private int index;
+
+        /** Position of the dirty page within the element at {@link #index}. */
+        private int position;
+
+        /**
+         * Returns the page memory for the associated dirty page.
+         */
+        public @Nullable PersistentPageMemory pageMemory() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns dirty page.
+         */
+        public @Nullable FullPageId dirtyPage() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getValue().get(position);
+        }
+    }
+
+    private static boolean equalsByGroupAndPartition(FullPageId pageId0, FullPageId pageId1) {

Review Comment:
   In 3.0, I did not find such a rule, if it does not exist, then let it remain as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915070085


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);

Review Comment:
   Oh, I missed that we can end up between the elements of the cumulativeSizes, in which case the returned value will be negative. So it's ok, my bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915067225


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2095,6 +2095,7 @@ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplac
 
         safeToUpdate.set(true);
 
+        // Less memory and no need for Set features.

Review Comment:
   Yes, `concat()` seems better



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;

Review Comment:
   Yes, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914823712


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;

Review Comment:
   `regionIndex` - i think it fits better, I'll rename it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915074458


##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java:
##########
@@ -181,13 +171,13 @@ void testMarkCheckpointBegin() throws Exception {
 
         CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
 
-        List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+        List<FullPageId> dirtyPages = List.of(of(0, 0, 0), of(0, 0, 1), of(0, 0, 2));

Review Comment:
   Yes, let's leave it, it's not a big deal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914791162


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2095,6 +2095,7 @@ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplac
 
         safeToUpdate.set(true);
 
+        // Less memory and no need for Set features.

Review Comment:
   Okay, I'll delete the comment.
   I can rename `CollectionUtils#union()` to **CollectionUtils#concat()**, I think it fits better, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914791162


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2095,6 +2095,7 @@ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplac
 
         safeToUpdate.set(true);
 
+        // Less memory and no need for Set features.

Review Comment:
   Okay, I'll delete the comment.
   I can rename `CollectionUtils#union()` to `CollectionUtils#concat()`, I think it fits better, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] vldpyatkov merged pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
vldpyatkov merged PR #907:
URL: https://github.com/apache/ignite-3/pull/907


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914822200


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int index;
+
+        /** Starting position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param index Element index in {@link CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page within the element at {@link #index}.
+         * @param toPosition End position (inclusive) of the dirty page within the element at {@link #index}.
+         */
+        private CheckpointDirtyPagesView(int index, int fromPosition, int toPosition) {
+            this.index = index;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {

Review Comment:
   `regionIndex` - i think it fits better, I'll rename it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914794790


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914803783


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;

Review Comment:
   Would such a comment be appropriate?
   `// toIndex cannot be 0 because endPageId is greater than startPageId by DIRTY_PAGE_COMPARATOR.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914857080


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);

Review Comment:
   Let's assume we have 4 regions with the following pages (FullPageId(grpId, partId)):
   - FullPageId(0, 0)
   - FullPageId(1, 0)
   - FullPageId(2, 0), FullPageId(2, 1)
   - FullPageId(3, 0, 0), FullPageId(3, 0, 1), FullPageId(3, 0, 2)
   
   Total size: 7
   cumulativeSizes: [1,2,4,7]
   
   Let's assume that the first thread got **FullPageId(0, 0)**, did something else, and when `CheckpointDirtyPagesQueue#next` is called, the next position is 5 (let's assume), then the binary search will return -3.
   
   Thus, a negative value is possible, but we will not go beyond the **cumulativeSizes** due to the check on total size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914870191


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -63,14 +61,8 @@
  * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
  */
 class CheckpointWorkflow {
-    /**
-     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
-     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
-     */
-    private final int parallelSortThreshold;
-
-    /** This number of threads will be created and used for parallel sorting. */
-    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+    /** Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])}. */
+    static final int PARALLEL_SORT_THRESHOLD = 40_000;

Review Comment:
   The link is in the ticket, do you think there is a need to indicate in the comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914830639


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914877427


##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java:
##########
@@ -181,13 +171,13 @@ void testMarkCheckpointBegin() throws Exception {
 
         CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
 
-        List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+        List<FullPageId> dirtyPages = List.of(of(0, 0, 0), of(0, 0, 1), of(0, 0, 2));

Review Comment:
   I can rename, but the code can be cumbersome, this is a private method, can we leave it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914871968


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -379,10 +378,12 @@ boolean writePages(
 
         tracker.onPagesWriteStart();
 
+        CheckpointDirtyPagesQueue checkpointDirtyPagesQueue = checkpointDirtyPages.toQueue();

Review Comment:
   Do you think it's necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915100589


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2095,6 +2095,7 @@ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplac
 
         safeToUpdate.set(true);
 
+        // Less memory and no need for Set features.

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914472240


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2095,6 +2095,7 @@ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplac
 
         safeToUpdate.set(true);
 
+        // Less memory and no need for Set features.

Review Comment:
   This comment seems to compare the current code with some alternative. But where is the alternative?
   
   I suggest to either elaborate on what we compare the current code with, or remove the comment to avoid confusion.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()

Review Comment:
   Do we actually need this tiny economy? We seem to save a couple of allocations each checkpoint, and checkpoints happen pretty rarely. This does not seem worth complicating the code with this conditional operator.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;

Review Comment:
   I suggest adding a comment that `toIndex` (before reassignment) cannot be 0, to make the reassignment expression a little bit easier to understand



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */

Review Comment:
   ```suggestion
           /** Sizes of each element in {@link #dirtyPages} + the previous value in this array. */
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);

Review Comment:
   It looks like we are already sure that `index` cannot be found by binary search. How about adding `+1` to the first argument to skip it?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int index;
+
+        /** Starting position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param index Element index in {@link CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page within the element at {@link #index}.
+         * @param toPosition End position (inclusive) of the dirty page within the element at {@link #index}.
+         */
+        private CheckpointDirtyPagesView(int index, int fromPosition, int toPosition) {
+            this.index = index;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {
+            return dirtyPages.get(this.index).getValue().get(fromPosition + index);
+        }
+
+        /**
+         * Returns the page memory for view.
+         */
+        public PersistentPageMemory pageMemory() {
+            return dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns the size of the view.
+         */
+        public int size() {
+            return toPosition - fromPosition + 1;
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+
+        private boolean isToPositionLast() {
+            return toPosition == dirtyPages.get(index).getValue().size() - 1;
+        }
+    }
+
+    /**
+     * Holder is the result of getting the next dirty page in {@link CheckpointDirtyPagesQueue#next(QueueResult)}.
+     *
+     * <p>Not thread safe.
+     */
+    static class QueueResult {
+        private @Nullable CheckpointDirtyPagesQueue owner;
+
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private int index;
+
+        /** Position of the dirty page within the element at {@link #index}. */
+        private int position;
+
+        /**
+         * Returns the page memory for the associated dirty page.
+         */
+        public @Nullable PersistentPageMemory pageMemory() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns dirty page.
+         */
+        public @Nullable FullPageId dirtyPage() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getValue().get(position);
+        }
+    }
+
+    private static boolean equalsByGroupAndPartition(FullPageId pageId0, FullPageId pageId1) {

Review Comment:
   We usually put all member types last in a file (after all methods). Is this a rule in Ignite 3 code? If yes, this method should probably be moved before the member classes.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -379,10 +378,12 @@ boolean writePages(
 
         tracker.onPagesWriteStart();
 
+        CheckpointDirtyPagesQueue checkpointDirtyPagesQueue = checkpointDirtyPages.toQueue();

Review Comment:
   Should we just inline it if it's just used in one place?



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.QueueResult;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointDirtyPages} testing.
+ */
+public class CheckpointDirtyPagesTest {
+    @Test
+    void testDirtyPagesCount() {
+        var dirtyPages0 = createDirtyPages(of(0, 0, 0), of(0, 0, 1));

Review Comment:
   I think the convention is to only use `var` with `new` operator or with an explicit cast. Should an explicit type be used instead?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int index;
+
+        /** Starting position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param index Element index in {@link CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page within the element at {@link #index}.
+         * @param toPosition End position (inclusive) of the dirty page within the element at {@link #index}.
+         */
+        private CheckpointDirtyPagesView(int index, int fromPosition, int toPosition) {
+            this.index = index;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {

Review Comment:
   Both the parameter and a field have name `index`, but they mean different things. How about renaming the field (not the parameter) to something more descriptive, like `regionIndex` or `pageMemoryIndex`, to avoid the clash?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */

Review Comment:
   It looks more like a dirty page ID comparator, not page comparator



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -51,8 +53,8 @@ public class CheckpointPagesWriter implements Runnable {
     /** Checkpoint specific metrics tracker. */
     private final CheckpointMetricsTracker tracker;
 
-    /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */
-    private final IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> writePageIds;
+    /** Queue of dirty page IDs to write under this task. Overall pages to write may be greater than this queue. */

Review Comment:
   When will we write more than the IDs contained in this queue? Could you please clarify this in this javadoc too?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;

Review Comment:
   `index` -> `pageMemoryIndex` ? Because there are many 'indexable' things in the context, it's easy to get lost.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;

Review Comment:
   This array contains cumulative sizes, so I suggest renaming it (like `cumulativeSizes`) to make it obvious



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);

Review Comment:
   Can a negative value be returned here at all? As, if the position is more than the whole sum of all sizes, we just should not be here.
   
   If a negative is not a possibility, let's remove `abs()` and just assert that the value is non-negative.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -63,14 +61,8 @@
  * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
  */
 class CheckpointWorkflow {
-    /**
-     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
-     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
-     */
-    private final int parallelSortThreshold;
-
-    /** This number of threads will be created and used for parallel sorting. */
-    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+    /** Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])}. */
+    static final int PARALLEL_SORT_THRESHOLD = 40_000;

Review Comment:
   How about adding a link to the paper which was used to derive this value? Otherwise, it would look like an arbitrary choice.



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java:
##########
@@ -181,13 +171,13 @@ void testMarkCheckpointBegin() throws Exception {
 
         CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
 
-        List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+        List<FullPageId> dirtyPages = List.of(of(0, 0, 0), of(0, 0, 1), of(0, 0, 2));

Review Comment:
   `of()` method name does not seem the best choice as it gives the reader no clue about what kind of objects it constructs. Could we rename it?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */
+        private final int[] sizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.sizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.index = 0;
+            }
+
+            int index = result.index;
+
+            if (queuePosition >= sizes[index]) {
+                if (queuePosition == sizes[index]) {
+                    index++;
+                } else {
+                    index = findDirtyPagesIndex(index, queuePosition);
+                }
+            }
+
+            result.index = index;
+            result.position = index > 0 ? queuePosition - sizes[index - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(sizes, index, sizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int index;
+
+        /** Starting position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at {@link #index}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param index Element index in {@link CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page within the element at {@link #index}.
+         * @param toPosition End position (inclusive) of the dirty page within the element at {@link #index}.
+         */
+        private CheckpointDirtyPagesView(int index, int fromPosition, int toPosition) {
+            this.index = index;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {
+            return dirtyPages.get(this.index).getValue().get(fromPosition + index);
+        }
+
+        /**
+         * Returns the page memory for view.
+         */
+        public PersistentPageMemory pageMemory() {
+            return dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns the size of the view.
+         */
+        public int size() {
+            return toPosition - fromPosition + 1;
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+
+        private boolean isToPositionLast() {
+            return toPosition == dirtyPages.get(index).getValue().size() - 1;
+        }
+    }
+
+    /**
+     * Holder is the result of getting the next dirty page in {@link CheckpointDirtyPagesQueue#next(QueueResult)}.
+     *
+     * <p>Not thread safe.
+     */
+    static class QueueResult {
+        private @Nullable CheckpointDirtyPagesQueue owner;
+
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private int index;
+
+        /** Position of the dirty page within the element at {@link #index}. */
+        private int position;
+
+        /**
+         * Returns the page memory for the associated dirty page.
+         */
+        public @Nullable PersistentPageMemory pageMemory() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getKey();
+        }
+
+        /**
+         * Returns dirty page.
+         */
+        public @Nullable FullPageId dirtyPage() {
+            return owner == null ? null : owner.owner().dirtyPages.get(index).getValue().get(position);
+        }
+    }
+
+    private static boolean equalsByGroupAndPartition(FullPageId pageId0, FullPageId pageId1) {
+        return pageId0.groupId() == pageId1.groupId() && partitionId(pageId0.pageId()) == partitionId(pageId1.pageId());

Review Comment:
   How about introducing `FullPageId#partition()` method? It could be useful in other places as well.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2095,6 +2095,7 @@ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplac
 
         safeToUpdate.set(true);
 
+        // Less memory and no need for Set features.

Review Comment:
   A couple of other things that are related to code NOT touched by this PR, but still it might be useful.
   
   1. `CollectionUtils#union()` returns a `Collection` that, it seems, might calculate size incorrectly if it unites `Set`s (if you unite `Set.of(1)` with another `Set.of(1)`, the result will report size 2, even though, for sets, it should be 1). If this is how it is intended to work, I think it makes sense to rename the method, because `union()` strongly suggests that it has 'set union' semantics for sets. Also, this behavior should be clarified in the method Javadoc.
   2. `CollectionUtils#union()` does not make copies of the original collections, it just makes a wrapper around them. This might be a surprise for a user. I suggest to specifically mention this 'wrapping' mode in the method Javadoc to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914809193


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> dirtyPages) {
+        this(dirtyPages.isEmpty() ? List.of()
+                : dirtyPages.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int index;
+        int fromPosition;
+
+        if (currentView == null) {
+            index = 0;
+            fromPosition = 0;
+        } else {
+            index = currentView.isToPositionLast() ? currentView.index + 1 : currentView.index;
+            fromPosition = currentView.isToPositionLast() ? 0 : currentView.toPosition + 1;
+        }
+
+        if (index >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(index).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || !equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition + 1))) {
+            return new CheckpointDirtyPagesView(index, fromPosition, fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(index, fromPosition, fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes each element in {@link #dirtyPages} + the previous value in this array. */

Review Comment:
   fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r914869319


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -51,8 +53,8 @@ public class CheckpointPagesWriter implements Runnable {
     /** Checkpoint specific metrics tracker. */
     private final CheckpointMetricsTracker tracker;
 
-    /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */
-    private final IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> writePageIds;
+    /** Queue of dirty page IDs to write under this task. Overall pages to write may be greater than this queue. */

Review Comment:
   Try to fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915073112


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -63,14 +61,8 @@
  * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
  */
 class CheckpointWorkflow {
-    /**
-     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
-     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
-     */
-    private final int parallelSortThreshold;
-
-    /** This number of threads will be created and used for parallel sorting. */
-    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+    /** Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])}. */
+    static final int PARALLEL_SORT_THRESHOLD = 40_000;

Review Comment:
   Imagine that after a year or two there are 50 commits in the history of this file, and the line we are discussing is changed a few times (maybe the field is made public, the line moved, then the field is made package local, etc). It may become really difficult to find the commit which introduced this constant, and we need to find the commit to find the ticket. But if we put the link to a comment, there is hope that the comment will not get mutilated or removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915073848


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -379,10 +378,12 @@ boolean writePages(
 
         tracker.onPagesWriteStart();
 
+        CheckpointDirtyPagesQueue checkpointDirtyPagesQueue = checkpointDirtyPages.toQueue();

Review Comment:
   It seems to me that it would make the code a bit easier to read, but it's a matter of taste. Not a mandatory thing, for sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r913664207


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java:
##########
@@ -17,36 +17,33 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-
 /**
  * Data class of checkpoint information.
  */
 class Checkpoint {
-    /** Checkpoint pages. */
-    final IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> dirtyPages;
+    /** orted dirty pages from data regions that should be checkpointed. */
+    final CheckpointDirtyPages dirtyPages;
 
     /** Checkpoint progress status. */
     final CheckpointProgressImpl progress;
 
     /** Number of dirty pages. */
-    final int dirtyPagesSize;
+    final long dirtyPagesSize;

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #907: IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #907:
URL: https://github.com/apache/ignite-3/pull/907#discussion_r915104558


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -63,14 +61,8 @@
  * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
  */
 class CheckpointWorkflow {
-    /**
-     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
-     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
-     */
-    private final int parallelSortThreshold;
-
-    /** This number of threads will be created and used for parallel sorting. */
-    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+    /** Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])}. */
+    static final int PARALLEL_SORT_THRESHOLD = 40_000;

Review Comment:
   fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org