You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/02/02 14:04:29 UTC

[GitHub] [ignite-3] ibessonov commented on a change in pull request #602: IGNITE-15814 [Native Persistence 3.0] Data Structures porting

ibessonov commented on a change in pull request #602:
URL: https://github.com/apache/ignite-3/pull/602#discussion_r797377939



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
##########
@@ -340,9 +316,58 @@ public static boolean nullOrEmpty(boolean[] arr) {
     }
 
     /**
-     * Stub.
+     * Removes an element from an array with decrementing the array itself.
+     *
+     * @param arr Array.
+     * @param idx Index to remove.
+     * @return New array.
      */
-    private ArrayUtils() {
-        // No op.
+    public static <T> T[] remove(T[] arr, int idx) {
+        int len = arr.length;
+
+        assert idx >= 0 && idx < len : idx + " < " + len;
+
+        if (idx == len - 1) {
+            return Arrays.copyOfRange(arr, 0, len - 1);
+        }
+
+        if (idx == 0) {
+            return Arrays.copyOfRange(arr, 1, len);
+        }
+
+        T[] res = (T[]) Array.newInstance(arr.getClass().getComponentType(), len - 1);

Review comment:
       TODO: to the list of future improvements - we should benchmark it and compare with the implementation that would use `copyOf` + `arraycopy`, which should in theory save some time

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
##########
@@ -57,155 +58,137 @@
     public static final Object[] OBJECT_EMPTY_ARRAY = new Object[0];
 
     /** {@code byte} array factory. */
-    public static final ArrayFactory<byte[]> BYTE_ARRAY = new ArrayFactory<>() {
-        @Override
-        public byte[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid byte array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return BYTE_EMPTY_ARRAY;
-
-                default:
-                    return new byte[len];
-            }
+    public static final ArrayFactory<byte[]> BYTE_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid byte array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return BYTE_EMPTY_ARRAY;
+
+            default:
+                return new byte[len];
         }
     };
 
     /** {@code short} array factory. */
-    public static final ArrayFactory<short[]> SHORT_ARRAY = new ArrayFactory<>() {
-        @Override
-        public short[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid short array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return SHORT_EMPTY_ARRAY;
-
-                default:
-                    return new short[len];
-            }
+    public static final ArrayFactory<short[]> SHORT_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid short array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return SHORT_EMPTY_ARRAY;
+
+            default:
+                return new short[len];
         }
     };
 
     /** {@code int} array factory. */
-    public static final ArrayFactory<int[]> INT_ARRAY = new ArrayFactory<>() {
-        @Override
-        public int[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid int array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return INT_EMPTY_ARRAY;
-
-                default:
-                    return new int[len];
-            }
+    public static final ArrayFactory<int[]> INT_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid int array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return INT_EMPTY_ARRAY;
+
+            default:
+                return new int[len];
         }
     };
 
     /** {@code long} array factory. */
-    public static final ArrayFactory<long[]> LONG_ARRAY = new ArrayFactory<>() {
-        @Override
-        public long[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid long array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return LONG_EMPTY_ARRAY;
-
-                default:
-                    return new long[len];
-            }
+    public static final ArrayFactory<long[]> LONG_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid long array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return LONG_EMPTY_ARRAY;
+
+            default:
+                return new long[len];
         }
     };
 
     /** {@code float} array factory. */
-    public static final ArrayFactory<float[]> FLOAT_ARRAY = new ArrayFactory<>() {
-        @Override
-        public float[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid float array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return FLOAT_EMPTY_ARRAY;
-
-                default:
-                    return new float[len];
-            }
+    public static final ArrayFactory<float[]> FLOAT_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid float array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return FLOAT_EMPTY_ARRAY;
+
+            default:
+                return new float[len];
         }
     };
 
     /** {@code double} array factory. */
-    public static final ArrayFactory<double[]> DOUBLE_ARRAY = new ArrayFactory<>() {
-        @Override
-        public double[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid double array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return DOUBLE_EMPTY_ARRAY;
-
-                default:
-                    return new double[len];
-            }
+    public static final ArrayFactory<double[]> DOUBLE_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid double array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return DOUBLE_EMPTY_ARRAY;
+
+            default:
+                return new double[len];
         }
     };
 
     /** {@code char} array factory. */
-    public static final ArrayFactory<char[]> CHAR_ARRAY = new ArrayFactory<>() {
-        @Override
-        public char[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid char array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return CHAR_EMPTY_ARRAY;
-
-                default:
-                    return new char[len];
-            }
+    public static final ArrayFactory<char[]> CHAR_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid char array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return CHAR_EMPTY_ARRAY;
+
+            default:
+                return new char[len];
         }
     };
 
     /** {@code boolean} array factory. */
-    public static final ArrayFactory<boolean[]> BOOLEAN_ARRAY = new ArrayFactory<>() {
-        @Override
-        public boolean[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid boolean array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return BOOLEAN_EMPTY_ARRAY;
-
-                default:
-                    return new boolean[len];
-            }
+    public static final ArrayFactory<boolean[]> BOOLEAN_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid boolean array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return BOOLEAN_EMPTY_ARRAY;
+
+            default:
+                return new boolean[len];
         }
     };
 
+    /**
+     * Stub.
+     */
+    private ArrayUtils() {

Review comment:
       Ok, what is a practical purpose of this change? Like, really, no one will ever try to instantiate this class anyway

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/PageIo.java
##########
@@ -54,86 +55,75 @@
  * </ol>
  */
 public abstract class PageIo {
-    /**
-     * Maximum value for page type.
-     */
+    /** Maximum value for page type. */
     public static final int MAX_IO_TYPE = 65535 - 1;
 
-    /**
-     * Offset for "short" page type.
-     */
+    /** Offset for "short" page type. */
     public static final int TYPE_OFF = 0;
 
-    /**
-     * Offset for "short" page version.
-     */
+    /** Offset for "short" page version. */
     public static final int VER_OFF = TYPE_OFF + Short.BYTES;
 
-    /**
-     * Offset for "int" CRC.
-     */
+    /** Offset for "int" CRC. */
     public static final int CRC_OFF = VER_OFF + Short.BYTES;
 
-    /**
-     * Offset for "long" page ID.
-     */
+    /** Offset for "long" page ID. */
     public static final int PAGE_ID_OFF = CRC_OFF + Integer.BYTES;
 
-    /**
-     * Offset for "byte" rotated ID.
-     */
+    /** Offset for "byte" rotated ID. */
     public static final int ROTATED_ID_PART_OFF = PAGE_ID_OFF + Long.BYTES;
 
-    /**
-     * Offset for "byte" compression type.
-     */
+    /** Offset for "byte" compression type. */
     private static final int COMPRESSION_TYPE_OFF = ROTATED_ID_PART_OFF + Byte.BYTES;
 
-    /**
-     * Offset for "short" compressed size.
-     */
+    /** Offset for "short" compressed size. */
     private static final int COMPRESSED_SIZE_OFF = COMPRESSION_TYPE_OFF + Byte.BYTES;
 
-    /**
-     * Offset for "short" compacted size.
-     */
+    /** Offset for "short" compacted size. */
     private static final int COMPACTED_SIZE_OFF = COMPRESSED_SIZE_OFF + Short.BYTES;
 
-    /**
-     * Offset for reserved "short" value.
-     */
+    /** Offset for reserved "short" value. */
     private static final int RESERVED_SHORT_OFF = COMPACTED_SIZE_OFF + Short.BYTES;
 
-    /**
-     * Offset for reserved "long" value.
-     */
+    /** Offset for reserved "long" value. */
     private static final int RESERVED_2_OFF = RESERVED_SHORT_OFF + Short.BYTES;
 
-    /**
-     * Offset for reserved "long" value.
-     */
+    /** Offset for reserved "long" value. */
     private static final int RESERVED_3_OFF = RESERVED_2_OFF + Long.BYTES;
 
-    /**
-     * Total size of common header, including reserved bytes.
-     */
+    /** Total size of common header, including reserved bytes. */
     public static final int COMMON_HEADER_END = RESERVED_3_OFF + Long.BYTES;
 
-    /**
-     * IO version.
-     */
+    /* All the page types. */

Review comment:
       All references to specific page types should be removed from this class

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/AbstractFreeList.java
##########
@@ -0,0 +1,923 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ignite.internal.pagememory.freelist;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.LongListReuseBag;
+import org.apache.ignite.internal.pagememory.reuse.ReuseBag;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract free list.
+ */
+public abstract class AbstractFreeList<T extends Storable> extends PagesList implements FreeList<T>, ReuseList {
+    private static final int BUCKETS = 256; // Must be power of 2.
+
+    private static final int REUSE_BUCKET = BUCKETS - 1;
+
+    private static final Integer COMPLETE = Integer.MAX_VALUE;
+
+    private static final Integer FAIL_I = Integer.MIN_VALUE;
+
+    private static final Long FAIL_L = Long.MAX_VALUE;
+
+    private static final int MIN_PAGE_FREE_SPACE = 8;
+
+    /**
+     * Step between buckets in free list, measured in powers of two. For example, for page size 4096 and 256 buckets, shift is 4 and step is
+     * 16 bytes.
+     */
+    private final int shift;
+
+    /** Buckets. */
+    private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
+
+    /** Onheap bucket page list caches. */
+    private final AtomicReferenceArray<PagesCache> bucketCaches = new AtomicReferenceArray<>(BUCKETS);
+
+    /** Min size for data page in bytes. */
+    private final int minSizeForDataPage;
+
+    /** Page list cache limit. */
+    @Nullable
+    private final AtomicLong pageListCacheLimit;
+
+    /** Page eviction tracker. */
+    private final PageEvictionTracker evictionTracker;
+
+    private final PageHandler<T, Boolean> updateRow = new UpdateRowHandler();
+
+    /** Write a single row on a single page. */
+    private final WriteRowHandler writeRowHnd = new WriteRowHandler();
+
+    /** Write multiple rows on a single page. */
+    private final WriteRowsHandler writeRowsHnd = new WriteRowsHandler();
+
+    private final PageHandler<ReuseBag, Long> rmvRow;
+
+    private final class UpdateRowHandler implements PageHandler<T, Boolean> {
+        /** {@inheritDoc} */
+        @Override
+        public Boolean run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                T row,
+                int itemId,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox;
+
+            int rowSize = row.size();
+
+            boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
+
+            evictionTracker.touchPage(pageId);
+
+            return updated;
+        }
+    }
+
+    private class WriteRowHandler implements PageHandler<T, Integer> {
+        /** {@inheritDoc} */
+        @Override
+        public Integer run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                T row,
+                int written,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            written = addRow(pageId, pageAddr, iox, row, written);
+
+            putPage(((AbstractDataPageIo<Storable>) iox).getFreeSpace(pageAddr), pageId, pageAddr, statHolder);
+
+            return written;
+        }
+
+        /**
+         * Writes row to data page.
+         *
+         * @param pageId Page ID.
+         * @param pageAddr Page address.
+         * @param iox IO.
+         * @param row Row to write.
+         * @param written Written size.
+         * @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
+         * @throws IgniteInternalCheckedException If failed.
+         */
+        protected Integer addRow(
+                long pageId,
+                long pageAddr,
+                PageIo iox,
+                T row,
+                int written
+        ) throws IgniteInternalCheckedException {
+            AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox;
+
+            int rowSize = row.size();
+            int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+            assert oldFreeSpace > 0 : oldFreeSpace;
+
+            // If the full row does not fit into this page write only a fragment.
+            written = (written == 0 && oldFreeSpace >= rowSize) ? addRowFull(pageId, pageAddr, io, row, rowSize) :
+                    addRowFragment(pageId, pageAddr, io, row, written, rowSize);
+
+            if (written == rowSize) {
+                evictionTracker.touchPage(pageId);
+            }
+
+            // Avoid boxing with garbage generation for usual case.
+            return written == rowSize ? COMPLETE : written;
+        }
+
+        /**
+         * Adds row to this data page and sets respective link to the given row object.
+         *
+         * @param pageId Page ID.
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param row Row.
+         * @param rowSize Row size.
+         * @return Written size which is always equal to row size here.
+         * @throws IgniteInternalCheckedException If failed.
+         */
+        protected int addRowFull(
+                long pageId,
+                long pageAddr,
+                AbstractDataPageIo<T> io,
+                T row,
+                int rowSize
+        ) throws IgniteInternalCheckedException {
+            io.addRow(pageId, pageAddr, row, rowSize, pageSize());
+
+            return rowSize;
+        }
+
+        /**
+         * Adds maximum possible fragment of the given row to this data page and sets respective link to the row.
+         *
+         * @param pageId Page ID.
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param row Row.
+         * @param written Written size.
+         * @param rowSize Row size.
+         * @return Updated written size.
+         * @throws IgniteInternalCheckedException If failed.
+         */
+        protected int addRowFragment(
+                long pageId,
+                long pageAddr,
+                AbstractDataPageIo<T> io,
+                T row,
+                int written,
+                int rowSize
+        ) throws IgniteInternalCheckedException {
+            int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize());
+
+            assert payloadSize > 0 : payloadSize;
+
+            return written + payloadSize;
+        }
+
+        /**
+         * Put page into the free list if needed.
+         *
+         * @param freeSpace Page free space.
+         * @param pageId Page ID.
+         * @param pageAddr Page address.
+         * @param statHolder Statistics holder to track IO operations.
+         */
+        protected void putPage(
+                int freeSpace,
+                long pageId,
+                long pageAddr,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            if (freeSpace > MIN_PAGE_FREE_SPACE) {
+                int bucket = bucket(freeSpace, false);
+
+                put(null, pageId, pageAddr, bucket, statHolder);
+            }
+        }
+    }
+
+    private final class WriteRowsHandler implements PageHandler<IgniteCursor<T>, Integer> {
+        /** {@inheritDoc} */
+        @Override
+        public Integer run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                IgniteCursor<T> cur,
+                int written,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox;
+
+            // Fill the page up to the end.
+            while (written != COMPLETE || (!evictionTracker.evictionRequired() && cur.next())) {
+                T row = cur.get();
+
+                if (written == COMPLETE) {
+                    // If the data row was completely written without remainder, proceed to the next.
+                    if ((written = writeWholePages(row, statHolder)) == COMPLETE) {
+                        continue;
+                    }
+
+                    if (io.getFreeSpace(pageAddr) < row.size() - written) {
+                        break;
+                    }
+                }
+
+                written = writeRowHnd.addRow(pageId, pageAddr, io, row, written);
+
+                assert written == COMPLETE;
+
+                evictionTracker.touchPage(pageId);
+            }
+
+            writeRowHnd.putPage(io.getFreeSpace(pageAddr), pageId, pageAddr, statHolder);
+
+            return written;
+        }
+    }
+
+    private final class RemoveRowHandler implements PageHandler<ReuseBag, Long> {
+        /** Indicates whether partition ID should be masked from page ID. */
+        private final boolean maskPartId;
+
+        /**
+         * Constructor.
+         *
+         * @param maskPartId Indicates whether partition ID should be masked from page ID.
+         */
+        RemoveRowHandler(boolean maskPartId) {
+            this.maskPartId = maskPartId;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Long run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                ReuseBag reuseBag,
+                int itemId,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox;
+
+            int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+            assert oldFreeSpace >= 0 : oldFreeSpace;
+
+            long nextLink = io.removeRow(pageAddr, itemId, pageSize());
+
+            int newFreeSpace = io.getFreeSpace(pageAddr);
+
+            if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+                int newBucket = bucket(newFreeSpace, false);
+
+                boolean putIsNeeded = oldFreeSpace <= MIN_PAGE_FREE_SPACE;
+
+                if (!putIsNeeded) {
+                    int oldBucket = bucket(oldFreeSpace, false);
+
+                    if (oldBucket != newBucket) {
+                        // It is possible that page was concurrently taken for put, in this case put will handle bucket change.
+                        pageId = maskPartId ? PageIdUtils.maskPartitionId(pageId) : pageId;
+
+                        putIsNeeded = removeDataPage(pageId, pageAddr, io, oldBucket, statHolder);
+                    }
+                }
+
+                if (io.isEmpty(pageAddr)) {
+                    evictionTracker.forgetPage(pageId);
+
+                    if (putIsNeeded) {
+                        reuseBag.addFreePage(recyclePage(pageId, pageAddr));
+                    }
+                } else if (putIsNeeded) {
+                    put(null, pageId, pageAddr, newBucket, statHolder);
+                }
+            }
+
+            // For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
+            return nextLink;
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grpId Group ID.
+     * @param name Structure name (for debug purpose).
+     * @param pageMem Page memory.
+     * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
+     * @param lockLsnr Page lock listener.
+     * @param defaultPageFlag Default flag value for allocated pages. One of {@link PageIdAllocator#FLAG_DATA} or {@link
+     * PageIdAllocator#FLAG_AUX}.
+     * @param log Logger.
+     * @param metaPageId Metadata page ID.
+     * @param initNew {@code True} if new metadata should be initialized.
+     * @param pageListCacheLimit Page list cache limit.
+     * @param evictionTracker Page eviction tracker.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public AbstractFreeList(
+            int grpId,
+            String name,
+            PageMemory pageMem,
+            @Nullable ReuseList reuseList,
+            PageLockListener lockLsnr,
+            byte defaultPageFlag,
+            IgniteLogger log,
+            long metaPageId,
+            boolean initNew,
+            @Nullable AtomicLong pageListCacheLimit,
+            PageEvictionTracker evictionTracker
+    ) throws IgniteInternalCheckedException {
+        super(
+                grpId,
+                name,
+                pageMem,
+                lockLsnr,
+                defaultPageFlag,
+                log,
+                BUCKETS,
+                metaPageId
+        );
+
+        this.evictionTracker = evictionTracker;
+        this.pageListCacheLimit = pageListCacheLimit;
+
+        this.reuseList = reuseList == null ? this : reuseList;
+
+        rmvRow = new RemoveRowHandler(grpId == 0);
+
+        int pageSize = pageMem.pageSize();
+
+        assert isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
+        assert isPow2(BUCKETS);
+        assert BUCKETS <= pageSize : pageSize;
+
+        // TODO this constant is used because currently we cannot reuse data pages as index pages

Review comment:
       What the hell is this =)
   I know it's not yours, we should link a JIRA issue here as well

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datastructure;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.MAX_ITEMID_NUM;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.toDetailString;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseBag;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Base class for all the data structures based on {@link PageMemory}.
+ */
+public abstract class DataStructure {
+    /** For tests. */
+    public static Random rnd;

Review comment:
       This will have to be removed in the future, no more special fields for test. Please add TODO. I'd rather have a
   `private final IntUnaryOperator rng;` field with corresponding constructor parameter.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
##########
@@ -340,9 +316,58 @@ public static boolean nullOrEmpty(boolean[] arr) {
     }
 
     /**
-     * Stub.
+     * Removes an element from an array with decrementing the array itself.
+     *
+     * @param arr Array.
+     * @param idx Index to remove.
+     * @return New array.
      */
-    private ArrayUtils() {
-        // No op.
+    public static <T> T[] remove(T[] arr, int idx) {
+        int len = arr.length;
+
+        assert idx >= 0 && idx < len : idx + " < " + len;
+
+        if (idx == len - 1) {
+            return Arrays.copyOfRange(arr, 0, len - 1);
+        }
+
+        if (idx == 0) {
+            return Arrays.copyOfRange(arr, 1, len);
+        }
+
+        T[] res = (T[]) Array.newInstance(arr.getClass().getComponentType(), len - 1);
+
+        System.arraycopy(arr, 0, res, 0, idx);
+        System.arraycopy(arr, idx + 1, res, idx, len - idx - 1);
+
+        return res;
+    }
+
+    /**
+     * Removes an element from an array with decrementing the array itself.
+     *
+     * @param arr Array.
+     * @param idx Index to remove.
+     * @return New array.
+     */
+    public static long[] remove(long[] arr, int idx) {
+        int len = arr.length;
+
+        assert idx >= 0 && idx < len : idx + " < " + len;
+
+        if (idx == len - 1) {
+            return Arrays.copyOfRange(arr, 0, len - 1);
+        }
+
+        if (idx == 0) {
+            return Arrays.copyOfRange(arr, 1, len);
+        }
+
+        long[] res = new long[len - 1];

Review comment:
       Same here

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteLongList.java
##########
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Minimal list API to work with primitive longs. This list exists to avoid boxing/unboxing when using standard list from Java.
+ */
+public class IgniteLongList implements Externalizable {

Review comment:
       We should consider replacing it with the `it.unimi.dsi.fastutil.longs.LongArrayList`, for example. Should add it to the list of future improvements.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
##########
@@ -57,155 +58,137 @@
     public static final Object[] OBJECT_EMPTY_ARRAY = new Object[0];
 
     /** {@code byte} array factory. */
-    public static final ArrayFactory<byte[]> BYTE_ARRAY = new ArrayFactory<>() {
-        @Override
-        public byte[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid byte array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return BYTE_EMPTY_ARRAY;
-
-                default:
-                    return new byte[len];
-            }
+    public static final ArrayFactory<byte[]> BYTE_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid byte array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return BYTE_EMPTY_ARRAY;
+
+            default:
+                return new byte[len];
         }
     };
 
     /** {@code short} array factory. */
-    public static final ArrayFactory<short[]> SHORT_ARRAY = new ArrayFactory<>() {
-        @Override
-        public short[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid short array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return SHORT_EMPTY_ARRAY;
-
-                default:
-                    return new short[len];
-            }
+    public static final ArrayFactory<short[]> SHORT_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid short array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return SHORT_EMPTY_ARRAY;
+
+            default:
+                return new short[len];
         }
     };
 
     /** {@code int} array factory. */
-    public static final ArrayFactory<int[]> INT_ARRAY = new ArrayFactory<>() {
-        @Override
-        public int[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid int array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return INT_EMPTY_ARRAY;
-
-                default:
-                    return new int[len];
-            }
+    public static final ArrayFactory<int[]> INT_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid int array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return INT_EMPTY_ARRAY;
+
+            default:
+                return new int[len];
         }
     };
 
     /** {@code long} array factory. */
-    public static final ArrayFactory<long[]> LONG_ARRAY = new ArrayFactory<>() {
-        @Override
-        public long[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid long array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return LONG_EMPTY_ARRAY;
-
-                default:
-                    return new long[len];
-            }
+    public static final ArrayFactory<long[]> LONG_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid long array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return LONG_EMPTY_ARRAY;
+
+            default:
+                return new long[len];
         }
     };
 
     /** {@code float} array factory. */
-    public static final ArrayFactory<float[]> FLOAT_ARRAY = new ArrayFactory<>() {
-        @Override
-        public float[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid float array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return FLOAT_EMPTY_ARRAY;
-
-                default:
-                    return new float[len];
-            }
+    public static final ArrayFactory<float[]> FLOAT_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid float array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return FLOAT_EMPTY_ARRAY;
+
+            default:
+                return new float[len];
         }
     };
 
     /** {@code double} array factory. */
-    public static final ArrayFactory<double[]> DOUBLE_ARRAY = new ArrayFactory<>() {
-        @Override
-        public double[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid double array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return DOUBLE_EMPTY_ARRAY;
-
-                default:
-                    return new double[len];
-            }
+    public static final ArrayFactory<double[]> DOUBLE_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid double array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return DOUBLE_EMPTY_ARRAY;
+
+            default:
+                return new double[len];
         }
     };
 
     /** {@code char} array factory. */
-    public static final ArrayFactory<char[]> CHAR_ARRAY = new ArrayFactory<>() {
-        @Override
-        public char[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid char array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return CHAR_EMPTY_ARRAY;
-
-                default:
-                    return new char[len];
-            }
+    public static final ArrayFactory<char[]> CHAR_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid char array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return CHAR_EMPTY_ARRAY;
+
+            default:
+                return new char[len];
         }
     };
 
     /** {@code boolean} array factory. */
-    public static final ArrayFactory<boolean[]> BOOLEAN_ARRAY = new ArrayFactory<>() {
-        @Override
-        public boolean[] of(int len) {
-            if (len < 0) {
-                throw new IgniteInternalException("Read invalid boolean array length: " + len);
-            }
-
-            switch (len) {
-                case 0:
-                    return BOOLEAN_EMPTY_ARRAY;
-
-                default:
-                    return new boolean[len];
-            }
+    public static final ArrayFactory<boolean[]> BOOLEAN_ARRAY = len -> {
+        if (len < 0) {
+            throw new IgniteInternalException("Read invalid boolean array length: " + len);
+        }
+
+        switch (len) {
+            case 0:
+                return BOOLEAN_EMPTY_ARRAY;
+
+            default:
+                return new boolean[len];
         }
     };
 
+    /**
+     * Stub.
+     */
+    private ArrayUtils() {
+        // No op.
+    }
+
     /**
      * Returns {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
      *
      * @param arr Array to check.
      * @param <T> Array element type.
-     * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.

Review comment:
       I would also like to have an explanation for these removed sections. This is completely unnecessary.

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/PageIoRegistry.java
##########
@@ -33,58 +33,77 @@
     /**
      * Loads all {@link IoVersions} from a {@link PageIoModule} using the {@link ServiceLoader} mechanism.
      *
-     * @throws IllegalStateException If there's an invalid page type or several different {@link IoVersions} instances for the same type.
+     * @return {@code this} for code chaining.
+     * @throws IllegalStateException If there's an invalid page type or several {@link IoVersions} instances for the same type.
      */
-    public void loadFromServiceLoader() {
+    public PageIoRegistry loadAllFromServiceLoader() {
         ServiceLoader<PageIoModule> serviceLoader = ServiceLoader.load(PageIoModule.class);
 
         for (PageIoModule pageIoModule : serviceLoader) {
-            for (IoVersions<?> ios : pageIoModule.ioVersions()) {
-                if (ios.getType() == 0) {
-                    throw new IllegalStateException("Type 0 is reserved and can't be used: " + ios);
-                }
+            for (IoVersions<?> ioVersions : pageIoModule.ioVersions()) {
+                load(ioVersions);
+            }
+        }
 
-                if (ioVersions[ios.getType()] != null && !ioVersions[ios.getType()].equals(ios)) {
-                    throw new IllegalStateException("Duplicated IOVersions found: " + ios);
-                }
+        return this;
+    }
 
-                ioVersions[ios.getType()] = ios;
-            }
+    /**
+     * Loads {@link IoVersions}'s.
+     *
+     * @return {@code this} for code chaining.
+     * @throws IllegalStateException If there's an invalid page type or several {@link IoVersions} instances for the same type.
+     */
+    public PageIoRegistry load(IoVersions<?>... versions) {

Review comment:
       I don't really understand why you need this method. If it's for tests only then I'd recommend removing it.

##########
File path: modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/PageIoRegistryTest.java
##########
@@ -44,7 +46,7 @@ void testResolve() throws Exception {
         assertThrows(IgniteInternalCheckedException.class, () -> ioRegistry.resolve(TEST_PAGE_TYPE, TEST_PAGE_VER));
 
         // Load all PageIOModule-s from the classpath.
-        ioRegistry.loadFromServiceLoader();
+        ioRegistry.loadAllFromServiceLoader();

Review comment:
       Why did you rename this method? What is "all"?

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/io/PagesListNodeIo.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.freelist.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.maskPartitionId;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.copyMemory;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getShort;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.lang.IgniteStringBuilder;
+
+/**
+ * Pages list node IO.
+ * TODO optimize: now we have slow {@link #removePage(long, long)}.

Review comment:
       Please add a link to some issue

##########
File path: modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/TestDataPageIo.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.freelist;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.lang.IgniteStringBuilder;
+
+/**
+ * Test DataPageIo for {@link TestDataRow}.
+ */
+class TestDataPageIo extends AbstractDataPageIo<TestDataRow> {
+    /** I/O versions. */
+    static final IoVersions<TestDataPageIo> VERSIONS = new IoVersions<>(new TestDataPageIo());
+
+    /**
+     * Private constructor.
+     */
+    private TestDataPageIo() {
+        super(T_DATA, 1);

Review comment:
       I believe that we need to have a reserved segment of types for tests specifically. Current usage of the constant from PageIo is just wrong

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
##########
@@ -0,0 +1,2172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.freelist;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_DATA_PART;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_META;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_PAGE_LIST_NODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.MAX_ITEMID_NUM;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.changeType;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ArrayUtils.remove;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.datastructure.DataStructure;
+import org.apache.ignite.internal.pagememory.freelist.io.PagesListMetaIo;
+import org.apache.ignite.internal.pagememory.freelist.io.PagesListNodeIo;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseBag;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteLongList;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Striped doubly-linked list of page IDs optionally organized in buckets.
+ */
+public abstract class PagesList extends DataStructure {
+    /** Count of tries to lock stripe before fail back to blocking lock. */
+    public static final String IGNITE_PAGES_LIST_TRY_LOCK_ATTEMPTS = "IGNITE_PAGES_LIST_TRY_LOCK_ATTEMPTS";
+
+    /** Maximum count of the stripes. */
+    public static final String IGNITE_PAGES_LIST_STRIPES_PER_BUCKET = "IGNITE_PAGES_LIST_STRIPES_PER_BUCKET";
+
+    /**
+     * Disable onheap caching of pages lists (free lists and reuse lists). If persistence is enabled changes to page lists are not stored to
+     * page memory immediately, they are cached in onheap buffer and flushes to page memory on a checkpoint. This property allows to disable
+     * such onheap caching.
+     */
+    public static final String IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING = "IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING";
+
+    private final int tryLockAttempts = getInteger(IGNITE_PAGES_LIST_TRY_LOCK_ATTEMPTS, 10);
+
+    private final int maxStripesPerBucket = getInteger(
+            IGNITE_PAGES_LIST_STRIPES_PER_BUCKET,
+            Math.max(8, Runtime.getRuntime().availableProcessors())
+    );
+
+    private final boolean pagesListCachingDisabled = getBoolean(IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING, false);
+
+    /** Logger. */
+    protected final IgniteLogger log;
+
+    /** Basket sizes. */
+    protected final AtomicLongArray bucketsSize;
+
+    /** Flag indicating that the {@link PagesList} has been changed. */
+    protected volatile boolean changed;
+
+    /** Page cache changed. */
+    protected volatile boolean pageCacheChanged;
+
+    /** Page ID to store list metadata. */
+    private final long metaPageId;
+
+    /** Number of buckets. */
+    private final int buckets;
+
+    /** Flag to enable/disable onheap list caching. */
+    private volatile boolean onheapListCachingEnabled;
+
+    private final PageHandler<Void, Boolean> cutTail = new CutTail();
+
+    private final PageHandler<Void, Boolean> putBucket = new PutBucket();
+
+    private final class CutTail implements PageHandler<Void, Boolean> {
+        /** {@inheritDoc} */
+        @Override
+        public Boolean run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                Void ignore,
+                int bucket,
+                IoStatisticsHolder statHolder
+        ) {
+            assert getPageId(pageAddr) == pageId;
+
+            PagesListNodeIo io = (PagesListNodeIo) iox;
+
+            long tailId = io.getNextId(pageAddr);
+
+            assert tailId != 0;
+
+            io.setNextId(pageAddr, 0L);
+
+            updateTail(bucket, tailId, pageId);
+
+            return TRUE;
+        }
+    }
+
+    private final class PutBucket implements PageHandler<Void, Boolean> {
+        /** {@inheritDoc} */
+        @Override
+        public Boolean run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                Void ignore,
+                int oldBucket,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            decrementBucketSize(oldBucket);
+
+            // Recalculate bucket because page free space can be changed concurrently.
+            int freeSpace = ((AbstractDataPageIo<Storable>) iox).getFreeSpace(pageAddr);
+
+            int newBucket = getBucketIndex(freeSpace);
+
+            if (newBucket != oldBucket && log.isDebugEnabled()) {
+                log.debug("Bucket changed when moving from heap to PageMemory [list=" + name() + ", oldBucket=" + oldBucket
+                        + ", newBucket=" + newBucket + ", pageId=" + pageId + ']');
+            }
+
+            if (newBucket >= 0) {
+                put(null, pageId, pageAddr, newBucket, statHolder);
+            }
+
+            return TRUE;
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grpId Group ID.
+     * @param name Structure name (for debug purpose).
+     * @param pageMem Page memory.
+     * @param lockLsnr Page lock listener.
+     * @param defaultPageFlag Default flag value for allocated pages. One of {@link PageIdAllocator#FLAG_DATA} or {@link
+     * PageIdAllocator#FLAG_AUX}.
+     * @param log Logger.
+     * @param buckets Number of buckets.
+     * @param metaPageId Metadata page ID.
+     */
+    protected PagesList(
+            int grpId,
+            String name,
+            PageMemory pageMem,
+            PageLockListener lockLsnr,
+            byte defaultPageFlag,
+            IgniteLogger log,
+            int buckets,
+            long metaPageId
+    ) {
+        super(name, grpId, null, pageMem, lockLsnr, defaultPageFlag);
+
+        this.log = log;
+
+        this.buckets = buckets;
+        this.metaPageId = metaPageId;
+
+        onheapListCachingEnabled = isCachingApplicable();
+
+        bucketsSize = new AtomicLongArray(buckets);
+    }
+
+    /**
+     * Initializes a {@link PagesList}.
+     *
+     * @param metaPageId Metadata page ID.
+     * @param initNew {@code True} if new list if created, {@code false} if should be initialized from metadata.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final void init(long metaPageId, boolean initNew) throws IgniteInternalCheckedException {
+        if (metaPageId != 0L) {
+            if (initNew) {
+                init(metaPageId, PagesListMetaIo.VERSIONS.latest());
+            } else {
+                Map<Integer, IgniteLongList> bucketsData = new HashMap<>();
+
+                long nextId = metaPageId;
+
+                while (nextId != 0) {
+                    final long pageId = nextId;
+                    final long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+
+                    try {
+                        long pageAddr = readLock(pageId, page); // No concurrent recycling on init.
+
+                        assert pageAddr != 0L;
+
+                        try {
+                            PagesListMetaIo io = PagesListMetaIo.VERSIONS.forPage(pageAddr);
+
+                            io.getBucketsData(pageAddr, bucketsData);
+
+                            nextId = io.getNextMetaPageId(pageAddr);
+
+                            assert nextId != pageId : "Loop detected [next=" + hexLong(nextId) + ", cur=" + hexLong(pageId) + ']';
+                        } finally {
+                            readUnlock(pageId, page, pageAddr);
+                        }
+                    } finally {
+                        releasePage(pageId, page);
+                    }
+                }
+
+                for (Map.Entry<Integer, IgniteLongList> e : bucketsData.entrySet()) {
+                    int bucket = e.getKey();
+                    long bucketSize = 0;
+
+                    Stripe[] old = getBucket(bucket);
+                    assert old == null;
+
+                    long[] upd = e.getValue().array();
+
+                    Stripe[] tails = new Stripe[upd.length];
+
+                    for (int i = 0; i < upd.length; i++) {
+                        long tailId = upd[i];
+
+                        long prevId = tailId;
+                        int cnt = 0;
+
+                        while (prevId != 0L) {
+                            final long pageId = prevId;
+                            final long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+                            try {
+                                long pageAddr = readLock(pageId, page);
+
+                                assert pageAddr != 0L;
+
+                                try {
+                                    PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(pageAddr);
+
+                                    cnt += io.getCount(pageAddr);
+                                    prevId = io.getPreviousId(pageAddr);
+
+                                    // In reuse bucket the page itself can be used as a free page.
+                                    if (isReuseBucket(bucket) && prevId != 0L) {
+                                        cnt++;
+                                    }
+                                } finally {
+                                    readUnlock(pageId, page, pageAddr);
+                                }
+                            } finally {
+                                releasePage(pageId, page);
+                            }
+                        }
+
+                        Stripe stripe = new Stripe(tailId, cnt == 0);
+                        tails[i] = stripe;
+                        bucketSize += cnt;
+                    }
+
+                    boolean ok = casBucket(bucket, null, tails);
+
+                    assert ok;
+
+                    bucketsSize.set(bucket, bucketSize);
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns {@code True} if onheap caching is applicable for this pages list, or {@code false} if caching is disabled explicitly by
+     * system property or if page list belongs to in-memory data region (in this case onheap caching makes no sense).
+     */
+    private boolean isCachingApplicable() {
+        return !pagesListCachingDisabled;
+    }
+
+    /**
+     * Save metadata without exclusive lock on it.
+     *
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void saveMetadata(IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        long nextPageId = metaPageId;
+
+        assert nextPageId != 0;
+
+        flushBucketsCache(statHolder);
+
+        if (!changed) {
+            return;
+        }
+
+        // This guaranteed that any concurrently changes of list will be detected.
+        changed = false;
+
+        try {
+            long unusedPageId = writeFreeList(nextPageId);
+
+            markUnusedPagesDirty(unusedPageId);
+        } catch (Throwable e) {
+            changed = true; // Return changed flag due to exception.
+
+            throw e;
+        }
+    }
+
+    /**
+     * Flush onheap cached pages lists to page memory.
+     *
+     * @param statHolder Statistic holder.
+     * @throws IgniteInternalCheckedException If failed to write a page.
+     */
+    private void flushBucketsCache(IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        if (!isCachingApplicable() || !pageCacheChanged) {
+            return;
+        }
+
+        pageCacheChanged = false;
+
+        onheapListCachingEnabled = false;
+
+        int lockedPages = 0;
+
+        try {
+            for (int bucket = 0; bucket < buckets; bucket++) {
+                PagesCache pagesCache = getBucketCache(bucket, false);
+
+                if (pagesCache == null) {
+                    continue;
+                }
+
+                IgniteLongList pages = pagesCache.flush();
+
+                if (pages != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Move pages from heap to PageMemory [list=" + name() + ", bucket=" + bucket
+                                + ", pages=" + pages + ']');
+                    }
+
+                    for (int i = 0; i < pages.size(); i++) {
+                        long pageId = pages.get(i);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Move page from heap to PageMemory [list=" + name() + ", bucket=" + bucket
+                                    + ", pageId=" + pageId + ']');
+                        }
+
+                        Boolean res = write(pageId, putBucket, bucket, null, statHolder);
+
+                        if (res == null) {
+                            // Return page to onheap pages list if can't lock it.
+                            pagesCache.add(pageId);
+
+                            lockedPages++;
+                        }
+                    }
+                }
+            }
+        } finally {
+            onheapListCachingEnabled = true;
+        }
+
+        if (lockedPages != 0) {
+            if (log.isInfoEnabled()) {
+                log.info("Several pages were locked and weren't flushed on disk [grp=" + grpName
+                        + ", lockedPages=" + lockedPages + ']');
+            }
+
+            pageCacheChanged = true;
+        }
+    }
+
+    /**
+     * Write free list data to page memory.
+     *
+     * @param nextPageId First free page id.
+     * @return Unused free list page id.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private long writeFreeList(long nextPageId) throws IgniteInternalCheckedException {
+        long curId = 0L;
+        long curPage = 0L;
+        long curAddr = 0L;
+
+        PagesListMetaIo curIo = null;
+
+        try {
+            for (int bucket = 0; bucket < buckets; bucket++) {
+                Stripe[] tails = getBucket(bucket);
+
+                if (tails != null) {
+                    int tailIdx = 0;
+
+                    while (tailIdx < tails.length) {
+                        int written = curPage != 0L ? curIo.addTails(pageMem.realPageSize(grpId), curAddr, bucket, tails, tailIdx) : 0;
+
+                        if (written == 0) {
+                            if (nextPageId == 0L) {
+                                nextPageId = allocatePageNoReuse();
+
+                                if (curPage != 0L) {
+                                    curIo.setNextMetaPageId(curAddr, nextPageId);
+
+                                    releaseAndWriteUnlock(curId, curPage, curAddr);
+                                }
+
+                                curId = nextPageId;
+                                curPage = acquirePage(curId, IoStatisticsHolderNoOp.INSTANCE);
+                                curAddr = writeLock(curId, curPage);
+
+                                curIo = PagesListMetaIo.VERSIONS.latest();
+
+                                curIo.initNewPage(curAddr, curId, pageSize());
+                            } else {
+                                releaseAndWriteUnlock(curId, curPage, curAddr);
+
+                                curId = nextPageId;
+                                curPage = acquirePage(curId, IoStatisticsHolderNoOp.INSTANCE);
+                                curAddr = writeLock(curId, curPage);
+
+                                curIo = PagesListMetaIo.VERSIONS.forPage(curAddr);
+
+                                curIo.resetCount(curAddr);
+                            }
+
+                            nextPageId = curIo.getNextMetaPageId(curAddr);
+                        } else {
+                            tailIdx += written;
+                        }
+                    }
+                }
+            }
+        } finally {
+            releaseAndWriteUnlock(curId, curPage, curAddr);
+        }
+
+        return nextPageId;
+    }
+
+    /**
+     * Mark unused pages as dirty.
+     *
+     * @param nextPageId First unused page.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void markUnusedPagesDirty(long nextPageId) throws IgniteInternalCheckedException {
+        while (nextPageId != 0L) {
+            long pageId = nextPageId;
+
+            long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+            try {
+                long pageAddr = writeLock(pageId, page);
+
+                try {
+                    PagesListMetaIo io = PagesListMetaIo.VERSIONS.forPage(pageAddr);
+
+                    io.resetCount(pageAddr);
+
+                    nextPageId = io.getNextMetaPageId(pageAddr);
+                } finally {
+                    writeUnlock(pageId, page, pageAddr, true);
+                }
+            } finally {
+                releasePage(pageId, page);
+            }
+        }
+    }
+
+    /**
+     * Releases the page and acquired write lock on it.
+     *
+     * @param pageId Page ID.
+     * @param page Page absolute pointer.
+     * @param pageAddr Page address.
+     */
+    private void releaseAndWriteUnlock(long pageId, long page, long pageAddr) {
+        if (page != 0L) {
+            try {
+                writeUnlock(pageId, page, pageAddr, true);
+            } finally {
+                releasePage(pageId, page);
+            }
+        }
+    }
+
+    /**
+     * Gets bucket index by page freespace.
+     *
+     * @return Bucket index or -1 if page doesn't belong to any bucket.
+     */
+    protected abstract int getBucketIndex(int freeSpace);
+
+    /**
+     * Returns bucket.
+     *
+     * @param bucket Bucket index.
+     */
+    protected abstract @Nullable Stripe[] getBucket(int bucket);
+
+    /**
+     * Sets a bucket through the "compare and set" operation.
+     *
+     * @param bucket Bucket index.
+     * @param exp Expected bucket.
+     * @param upd Updated bucket.
+     * @return {@code true} If succeeded.
+     */
+    protected abstract boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd);
+
+    /**
+     * Returns {@code true} if it is a reuse bucket.
+     *
+     * @param bucket Bucket index.
+     */
+    protected abstract boolean isReuseBucket(int bucket);
+
+    /**
+     * Return bucket cache.
+     *
+     * @param bucket Bucket index.
+     * @param create Create if missing.
+     */
+    protected abstract @Nullable PagesCache getBucketCache(int bucket, boolean create);
+
+    /**
+     * Initializes a new {@link PagesListNodeIo}.
+     *
+     * @param io IO.
+     * @param prevId Previous page ID.
+     * @param prev Previous page buffer.
+     * @param nextId Next page ID.
+     * @param next Next page buffer.
+     */
+    private void setupNextPage(PagesListNodeIo io, long prevId, long prev, long nextId, long next) {
+        assert io.getNextId(prev) == 0L;
+
+        io.initNewPage(next, nextId, pageSize());
+        io.setPreviousId(next, prevId);
+
+        io.setNextId(prev, nextId);
+    }
+
+    /**
+     * Adds stripe to the given bucket.
+     *
+     * @param bucket Bucket.
+     * @param bag Reuse bag.
+     * @param reuse {@code True} if possible to use reuse list.
+     * @return Tail page ID.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private Stripe addStripe(int bucket, ReuseBag bag, boolean reuse) throws IgniteInternalCheckedException {
+        long pageId = allocatePage(bag, reuse);
+
+        init(pageId, PagesListNodeIo.VERSIONS.latest());
+
+        Stripe stripe = new Stripe(pageId, true);
+
+        for (; ; ) {
+            Stripe[] old = getBucket(bucket);
+            Stripe[] upd;
+
+            if (old != null) {
+                int len = old.length;
+
+                upd = Arrays.copyOf(old, len + 1);
+
+                upd[len] = stripe;
+            } else {
+                upd = new Stripe[]{stripe};
+            }
+
+            if (casBucket(bucket, old, upd)) {
+                changed();
+
+                return stripe;
+            }
+        }
+    }
+
+    /**
+     * Stripe tail update.
+     *
+     * @param bucket Bucket index.
+     * @param oldTailId Old tail page ID to replace.
+     * @param newTailId New tail page ID.
+     * @return {@code True} if stripe was removed.
+     */
+    private boolean updateTail(int bucket, long oldTailId, long newTailId) {
+        int idx = -1;
+
+        try {
+            for (; ; ) {
+                Stripe[] tails = getBucket(bucket);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Update tail [list=" + name() + ", bucket=" + bucket + ", oldTailId=" + oldTailId
+                            + ", newTailId=" + newTailId + ", tails=" + Arrays.toString(tails));
+                }
+
+                // Tail must exist to be updated.
+                assert !nullOrEmpty(tails) : "Missing tails [bucket=" + bucket + ", tails=" + Arrays.toString(tails)
+                        + ", metaPage=" + hexLong(metaPageId) + ", grpId=" + grpId + ']';
+
+                idx = findTailIndex(tails, oldTailId, idx);
+
+                assert tails[idx].tailId == oldTailId;
+
+                if (newTailId == 0L) {
+                    if (tails.length <= maxStripesPerBucket / 2) {
+                        tails[idx].empty = true;
+
+                        return false;
+                    }
+
+                    Stripe[] newTails;
+
+                    if (tails.length != 1) {
+                        newTails = remove(tails, idx);
+                    } else {
+                        newTails = null; // Drop the bucket completely.
+                    }
+
+                    if (casBucket(bucket, tails, newTails)) {
+                        // Reset tailId for invalidation of locking when stripe was taken concurrently.
+                        tails[idx].tailId = 0L;
+
+                        return true;
+                    }
+                } else {
+                    // It is safe to assign new tail since we do it only when write lock on tail is held.
+                    tails[idx].tailId = newTailId;
+
+                    return true;
+                }
+            }
+        } finally {
+            changed();
+        }
+    }
+
+    /**
+     * Returns first found index of the given tail ID.
+     *
+     * @param tails Tails.
+     * @param tailId Tail ID to find.
+     * @param expIdx Expected index.
+     * @throws IllegalStateException If tail not found.
+     */
+    private static int findTailIndex(Stripe[] tails, long tailId, int expIdx) {
+        if (expIdx != -1 && tails.length > expIdx && tails[expIdx].tailId == tailId) {
+            return expIdx;
+        }
+
+        for (int i = 0; i < tails.length; i++) {
+            if (tails[i].tailId == tailId) {
+                return i;
+            }
+        }
+
+        throw new IllegalStateException("Tail not found: " + tailId);
+    }
+
+    /**
+     * Returns stripe for put page.
+     *
+     * @param bucket Bucket.
+     * @param bag Reuse bag.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private Stripe getStripeForPut(int bucket, ReuseBag bag) throws IgniteInternalCheckedException {
+        Stripe[] tails = getBucket(bucket);
+
+        if (tails == null) {
+            return addStripe(bucket, bag, true);
+        }
+
+        return randomTail(tails);
+    }
+
+    /**
+     * Returns random tail.
+     *
+     * @param tails Tails.
+     */
+    private static Stripe randomTail(Stripe[] tails) {
+        int len = tails.length;
+
+        assert len != 0;
+
+        return tails[randomInt(len)];
+    }
+
+    /**
+     * !!! For tests only, does not provide any correctness guarantees for concurrent access.
+     *
+     * @param bucket Bucket index.
+     * @return Number of pages stored in this list.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final long storedPagesCount(int bucket) throws IgniteInternalCheckedException {
+        long res = 0;
+
+        Stripe[] tails = getBucket(bucket);
+
+        if (tails != null) {
+            for (Stripe tail : tails) {
+                long tailId = tail.tailId;
+
+                while (tailId != 0L) {
+                    final long pageId = tailId;
+                    final long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+                    try {
+                        long pageAddr = readLock(pageId, page);
+
+                        assert pageAddr != 0L;
+
+                        try {
+                            PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(pageAddr);
+
+                            int cnt = io.getCount(pageAddr);
+
+                            assert cnt >= 0;
+
+                            res += cnt;
+                            tailId = io.getPreviousId(pageAddr);
+
+                            // In reuse bucket the page itself can be used as a free page.
+                            if (isReuseBucket(bucket) && tailId != 0L) {
+                                res++;
+                            }
+                        } finally {
+                            readUnlock(pageId, page, pageAddr);
+                        }
+                    } finally {
+                        releasePage(pageId, page);
+                    }
+                }
+            }
+        }
+
+        assert res == bucketsSize.get(bucket) : "Wrong bucket size counter [exp=" + res + ", cntr=" + bucketsSize.get(bucket) + ']';
+
+        return res;
+    }
+
+    /**
+     * Puts data page.
+     *
+     * @param bag Reuse bag.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final void put(
+            @Nullable ReuseBag bag,
+            final long dataId,
+            final long dataAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        assert bag == null ^ dataAddr == 0L;
+
+        if (bag != null && bag.isEmpty()) {
+            // Skip allocating stripe for empty bag.
+            return;
+        }
+
+        if (bag == null && onheapListCachingEnabled && putDataPage(getBucketCache(bucket, true), dataId, dataAddr, bucket)) {
+            // Successfully put page to the onheap pages list cache.
+            if (log.isDebugEnabled()) {
+                log.debug("Put page to pages list cache [list=" + name() + ", bucket=" + bucket
+                        + ", dataId=" + dataId + ']');
+            }
+
+            return;
+        }
+
+        for (int lockAttempt = 0; ; ) {
+            Stripe stripe = getStripeForPut(bucket, bag);
+
+            // No need to continue if bag has been utilized at getPageForPut (free page can be used for pagelist).
+            if (bag != null && bag.isEmpty()) {
+                return;
+            }
+
+            final long tailId = stripe.tailId;
+
+            // Stripe was removed from bucket concurrently.
+            if (tailId == 0L) {
+                continue;
+            }
+
+            final long tailPage = acquirePage(tailId, statHolder);
+
+            try {
+                long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++, bag); // Explicit check.
+
+                if (tailAddr == 0L) {
+                    // No need to continue if bag has been utilized at writeLockPage.
+                    if (bag != null && bag.isEmpty()) {
+                        return;
+                    } else {
+                        continue;
+                    }
+                }
+
+                if (stripe.tailId != tailId) {
+                    // Another thread took the last page.
+                    writeUnlock(tailId, tailPage, tailAddr, false);
+
+                    lockAttempt--; // Ignore current attempt.
+
+                    continue;
+                }
+
+                assert getPageId(tailAddr) == tailId
+                        : "tailId = " + hexLong(tailId) + ", pageId = " + hexLong(getPageId(tailAddr));
+                assert getType(tailAddr) == T_PAGE_LIST_NODE
+                        : "tailId = " + hexLong(tailId) + ", type = " + getType(tailAddr);
+
+                boolean ok = false;
+
+                try {
+                    PagesListNodeIo io = pageMem.ioRegistry().resolve(tailAddr);
+
+                    ok = bag != null
+                            // Here we can always take pages from the bag to build our list.
+                            ? putReuseBag(tailId, tailAddr, io, bag, bucket, statHolder) :
+                            // Here we can use the data page to build list only if it is empty and
+                            // it is being put into reuse bucket. Usually this will be true, but there is
+                            // a case when there is no reuse bucket in the free list, but then deadlock
+                            // on node page allocation from separate reuse list is impossible.
+                            // If the data page is not empty it can not be put into reuse bucket and thus
+                            // the deadlock is impossible as well.
+                            putDataPage(tailId, tailAddr, io, dataId, dataAddr, bucket, statHolder);
+
+                    if (ok) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Put page to pages list [list=" + name() + ", bucket=" + bucket
+                                    + ", dataId=" + dataId + ", tailId=" + tailId + ']');
+                        }
+
+                        stripe.empty = false;
+
+                        return;
+                    }
+                } finally {
+                    writeUnlock(tailId, tailPage, tailAddr, ok);
+                }
+            } finally {
+                releasePage(tailId, tailPage);
+            }
+        }
+    }
+
+    /**
+     * Puts data page.
+     *
+     * @param pageId Page ID.
+     * @param pageAddr Page address.
+     * @param io IO.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return {@code true} If succeeded.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private boolean putDataPage(
+            final long pageId,
+            final long pageAddr,
+            PagesListNodeIo io,
+            final long dataId,
+            final long dataAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        if (io.getNextId(pageAddr) != 0L) {
+            return false; // Splitted.
+        }
+
+        int idx = io.addPage(pageAddr, dataId, pageSize());
+
+        if (idx == -1) {
+            handlePageFull(pageId, pageAddr, io, dataId, dataAddr, bucket, statHolder);
+        } else {
+            incrementBucketSize(bucket);
+
+            AbstractDataPageIo dataIo = pageMem.ioRegistry().resolve(dataAddr);
+            dataIo.setFreeListPageId(dataAddr, pageId);
+        }
+
+        return true;
+    }
+
+    /**
+     * Puts data page to page-list cache.
+     *
+     * @param pagesCache Page-list cache onheap.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket.
+     * @return {@code true} If succeeded.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private boolean putDataPage(
+            PagesCache pagesCache,
+            final long dataId,
+            final long dataAddr,
+            int bucket
+    ) throws IgniteInternalCheckedException {
+        if (pagesCache.add(dataId)) {
+            incrementBucketSize(bucket);
+
+            AbstractDataPageIo dataIo = pageMem.ioRegistry().resolve(dataAddr);
+
+            if (dataIo.getFreeListPageId(dataAddr) != 0L) {
+                dataIo.setFreeListPageId(dataAddr, 0L);
+            }
+
+            pageCacheChanged();
+
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Handles the added page.
+     *
+     * @param pageId Page ID.
+     * @param pageAddr Page address.
+     * @param io IO.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket index.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void handlePageFull(
+            final long pageId,
+            final long pageAddr,
+            PagesListNodeIo io,
+            final long dataId,
+            final long dataAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        AbstractDataPageIo dataIo = pageMem.ioRegistry().resolve(dataAddr);
+
+        // Attempt to add page failed: the node page is full.
+        if (isReuseBucket(bucket)) {
+            // If we are on the reuse bucket, we can not allocate new page, because it may cause deadlock.
+            assert dataIo.isEmpty(dataAddr); // We can put only empty data pages to reuse bucket.
+
+            // Change page type to index and add it as next node page to this list.
+            long newDataId = changeType(dataId, FLAG_AUX);
+
+            setupNextPage(io, pageId, pageAddr, newDataId, dataAddr);
+
+            // In reuse bucket the page itself can be used as a free page.
+            incrementBucketSize(bucket);
+
+            updateTail(bucket, pageId, newDataId);
+        } else {
+            // Just allocate a new node page and add our data page there.
+            final long nextId = allocatePage(null);
+            final long nextPage = acquirePage(nextId, statHolder);
+
+            try {
+                long nextPageAddr = writeLock(nextId, nextPage); // Newly allocated page.
+
+                assert nextPageAddr != 0L;
+
+                try {
+                    setupNextPage(io, pageId, pageAddr, nextId, nextPageAddr);
+
+                    int idx = io.addPage(nextPageAddr, dataId, pageSize());
+
+                    assert idx != -1;
+
+                    dataIo.setFreeListPageId(dataAddr, nextId);
+
+                    incrementBucketSize(bucket);
+
+                    updateTail(bucket, pageId, nextId);
+                } finally {
+                    writeUnlock(nextId, nextPage, nextPageAddr, true);
+                }
+            } finally {
+                releasePage(nextId, nextPage);
+            }
+        }
+    }
+
+    /**
+     * Puts page to reuse bag.
+     *
+     * @param pageId Page ID.
+     * @param pageAddr Page address.
+     * @param io IO.
+     * @param bag Reuse bag.
+     * @param bucket Bucket.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return {@code true} If succeeded.
+     * @throws IgniteInternalCheckedException if failed.
+     */
+    private boolean putReuseBag(
+            final long pageId,
+            final long pageAddr,
+            PagesListNodeIo io,
+            ReuseBag bag,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        assert bag != null : "bag is null";
+        assert !bag.isEmpty() : "bag is empty";
+
+        if (io.getNextId(pageAddr) != 0L) {
+            return false; // Splitted.
+        }
+
+        long nextId;
+
+        long prevId = pageId;
+        long prevAddr = pageAddr;
+
+        IgniteLongList locked = null; // TODO may be unlock right away and do not keep all these pages locked?

Review comment:
       Please add link to https://issues.apache.org/jira/browse/IGNITE-16350, for example

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
##########
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagememory.util;
 
+import org.apache.ignite.internal.pagememory.FullPageId;

Review comment:
       What is this? You moved that class?

##########
File path: modules/page-memory/src/test/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
##########
@@ -15,3 +15,4 @@
 # limitations under the License.
 #
 org.apache.ignite.internal.pagememory.TestPageIoModule
+org.apache.ignite.internal.pagememory.PageMemoryIoModule

Review comment:
       Wait a minute, why did you add this here?

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/PageIo.java
##########
@@ -54,86 +55,75 @@
  * </ol>
  */
 public abstract class PageIo {
-    /**
-     * Maximum value for page type.
-     */
+    /** Maximum value for page type. */
     public static final int MAX_IO_TYPE = 65535 - 1;
 
-    /**
-     * Offset for "short" page type.
-     */
+    /** Offset for "short" page type. */
     public static final int TYPE_OFF = 0;
 
-    /**
-     * Offset for "short" page version.
-     */
+    /** Offset for "short" page version. */
     public static final int VER_OFF = TYPE_OFF + Short.BYTES;
 
-    /**
-     * Offset for "int" CRC.
-     */
+    /** Offset for "int" CRC. */
     public static final int CRC_OFF = VER_OFF + Short.BYTES;
 
-    /**
-     * Offset for "long" page ID.
-     */
+    /** Offset for "long" page ID. */
     public static final int PAGE_ID_OFF = CRC_OFF + Integer.BYTES;
 
-    /**
-     * Offset for "byte" rotated ID.
-     */
+    /** Offset for "byte" rotated ID. */
     public static final int ROTATED_ID_PART_OFF = PAGE_ID_OFF + Long.BYTES;
 
-    /**
-     * Offset for "byte" compression type.
-     */
+    /** Offset for "byte" compression type. */
     private static final int COMPRESSION_TYPE_OFF = ROTATED_ID_PART_OFF + Byte.BYTES;
 
-    /**
-     * Offset for "short" compressed size.
-     */
+    /** Offset for "short" compressed size. */
     private static final int COMPRESSED_SIZE_OFF = COMPRESSION_TYPE_OFF + Byte.BYTES;
 
-    /**
-     * Offset for "short" compacted size.
-     */
+    /** Offset for "short" compacted size. */
     private static final int COMPACTED_SIZE_OFF = COMPRESSED_SIZE_OFF + Short.BYTES;
 
-    /**
-     * Offset for reserved "short" value.
-     */
+    /** Offset for reserved "short" value. */
     private static final int RESERVED_SHORT_OFF = COMPACTED_SIZE_OFF + Short.BYTES;
 
-    /**
-     * Offset for reserved "long" value.
-     */
+    /** Offset for reserved "long" value. */
     private static final int RESERVED_2_OFF = RESERVED_SHORT_OFF + Short.BYTES;
 
-    /**
-     * Offset for reserved "long" value.
-     */
+    /** Offset for reserved "long" value. */
     private static final int RESERVED_3_OFF = RESERVED_2_OFF + Long.BYTES;
 
-    /**
-     * Total size of common header, including reserved bytes.
-     */
+    /** Total size of common header, including reserved bytes. */
     public static final int COMMON_HEADER_END = RESERVED_3_OFF + Long.BYTES;
 
-    /**
-     * IO version.
-     */
+    /* All the page types. */
+
+    /** Data page type. */
+    public static final short T_DATA = 1;
+
+    /** Meta page type. */
+    public static final short T_META = 11;
+
+    /** Type page list meta. */
+    public static final short T_PAGE_LIST_META = 12;
+
+    /** Type page list node. */
+    public static final short T_PAGE_LIST_NODE = 13;
+
+    /** Simple (just byte array) data page type. */
+    public static final short T_DATA_PART = 32;

Review comment:
       Also, please remove unused page types.

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
##########
@@ -0,0 +1,2172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.freelist;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_DATA_PART;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_META;
+import static org.apache.ignite.internal.pagememory.io.PageIo.T_PAGE_LIST_NODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.MAX_ITEMID_NUM;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.changeType;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ArrayUtils.remove;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.datastructure.DataStructure;
+import org.apache.ignite.internal.pagememory.freelist.io.PagesListMetaIo;
+import org.apache.ignite.internal.pagememory.freelist.io.PagesListNodeIo;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseBag;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteLongList;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Striped doubly-linked list of page IDs optionally organized in buckets.
+ */
+public abstract class PagesList extends DataStructure {
+    /** Count of tries to lock stripe before fail back to blocking lock. */
+    public static final String IGNITE_PAGES_LIST_TRY_LOCK_ATTEMPTS = "IGNITE_PAGES_LIST_TRY_LOCK_ATTEMPTS";
+
+    /** Maximum count of the stripes. */
+    public static final String IGNITE_PAGES_LIST_STRIPES_PER_BUCKET = "IGNITE_PAGES_LIST_STRIPES_PER_BUCKET";
+
+    /**
+     * Disable onheap caching of pages lists (free lists and reuse lists). If persistence is enabled changes to page lists are not stored to
+     * page memory immediately, they are cached in onheap buffer and flushes to page memory on a checkpoint. This property allows to disable
+     * such onheap caching.
+     */
+    public static final String IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING = "IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING";
+
+    private final int tryLockAttempts = getInteger(IGNITE_PAGES_LIST_TRY_LOCK_ATTEMPTS, 10);
+
+    private final int maxStripesPerBucket = getInteger(
+            IGNITE_PAGES_LIST_STRIPES_PER_BUCKET,
+            Math.max(8, Runtime.getRuntime().availableProcessors())
+    );
+
+    private final boolean pagesListCachingDisabled = getBoolean(IGNITE_PAGES_LIST_DISABLE_ONHEAP_CACHING, false);
+
+    /** Logger. */
+    protected final IgniteLogger log;
+
+    /** Basket sizes. */
+    protected final AtomicLongArray bucketsSize;
+
+    /** Flag indicating that the {@link PagesList} has been changed. */
+    protected volatile boolean changed;
+
+    /** Page cache changed. */
+    protected volatile boolean pageCacheChanged;
+
+    /** Page ID to store list metadata. */
+    private final long metaPageId;
+
+    /** Number of buckets. */
+    private final int buckets;
+
+    /** Flag to enable/disable onheap list caching. */
+    private volatile boolean onheapListCachingEnabled;
+
+    private final PageHandler<Void, Boolean> cutTail = new CutTail();
+
+    private final PageHandler<Void, Boolean> putBucket = new PutBucket();
+
+    private final class CutTail implements PageHandler<Void, Boolean> {
+        /** {@inheritDoc} */
+        @Override
+        public Boolean run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                Void ignore,
+                int bucket,
+                IoStatisticsHolder statHolder
+        ) {
+            assert getPageId(pageAddr) == pageId;
+
+            PagesListNodeIo io = (PagesListNodeIo) iox;
+
+            long tailId = io.getNextId(pageAddr);
+
+            assert tailId != 0;
+
+            io.setNextId(pageAddr, 0L);
+
+            updateTail(bucket, tailId, pageId);
+
+            return TRUE;
+        }
+    }
+
+    private final class PutBucket implements PageHandler<Void, Boolean> {
+        /** {@inheritDoc} */
+        @Override
+        public Boolean run(
+                int cacheId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo iox,
+                Void ignore,
+                int oldBucket,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            decrementBucketSize(oldBucket);
+
+            // Recalculate bucket because page free space can be changed concurrently.
+            int freeSpace = ((AbstractDataPageIo<Storable>) iox).getFreeSpace(pageAddr);
+
+            int newBucket = getBucketIndex(freeSpace);
+
+            if (newBucket != oldBucket && log.isDebugEnabled()) {
+                log.debug("Bucket changed when moving from heap to PageMemory [list=" + name() + ", oldBucket=" + oldBucket
+                        + ", newBucket=" + newBucket + ", pageId=" + pageId + ']');
+            }
+
+            if (newBucket >= 0) {
+                put(null, pageId, pageAddr, newBucket, statHolder);
+            }
+
+            return TRUE;
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grpId Group ID.
+     * @param name Structure name (for debug purpose).
+     * @param pageMem Page memory.
+     * @param lockLsnr Page lock listener.
+     * @param defaultPageFlag Default flag value for allocated pages. One of {@link PageIdAllocator#FLAG_DATA} or {@link
+     * PageIdAllocator#FLAG_AUX}.
+     * @param log Logger.
+     * @param buckets Number of buckets.
+     * @param metaPageId Metadata page ID.
+     */
+    protected PagesList(
+            int grpId,
+            String name,
+            PageMemory pageMem,
+            PageLockListener lockLsnr,
+            byte defaultPageFlag,
+            IgniteLogger log,
+            int buckets,
+            long metaPageId
+    ) {
+        super(name, grpId, null, pageMem, lockLsnr, defaultPageFlag);
+
+        this.log = log;
+
+        this.buckets = buckets;
+        this.metaPageId = metaPageId;
+
+        onheapListCachingEnabled = isCachingApplicable();
+
+        bucketsSize = new AtomicLongArray(buckets);
+    }
+
+    /**
+     * Initializes a {@link PagesList}.
+     *
+     * @param metaPageId Metadata page ID.
+     * @param initNew {@code True} if new list if created, {@code false} if should be initialized from metadata.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final void init(long metaPageId, boolean initNew) throws IgniteInternalCheckedException {
+        if (metaPageId != 0L) {
+            if (initNew) {
+                init(metaPageId, PagesListMetaIo.VERSIONS.latest());
+            } else {
+                Map<Integer, IgniteLongList> bucketsData = new HashMap<>();
+
+                long nextId = metaPageId;
+
+                while (nextId != 0) {
+                    final long pageId = nextId;
+                    final long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+
+                    try {
+                        long pageAddr = readLock(pageId, page); // No concurrent recycling on init.
+
+                        assert pageAddr != 0L;
+
+                        try {
+                            PagesListMetaIo io = PagesListMetaIo.VERSIONS.forPage(pageAddr);
+
+                            io.getBucketsData(pageAddr, bucketsData);
+
+                            nextId = io.getNextMetaPageId(pageAddr);
+
+                            assert nextId != pageId : "Loop detected [next=" + hexLong(nextId) + ", cur=" + hexLong(pageId) + ']';
+                        } finally {
+                            readUnlock(pageId, page, pageAddr);
+                        }
+                    } finally {
+                        releasePage(pageId, page);
+                    }
+                }
+
+                for (Map.Entry<Integer, IgniteLongList> e : bucketsData.entrySet()) {
+                    int bucket = e.getKey();
+                    long bucketSize = 0;
+
+                    Stripe[] old = getBucket(bucket);
+                    assert old == null;
+
+                    long[] upd = e.getValue().array();
+
+                    Stripe[] tails = new Stripe[upd.length];
+
+                    for (int i = 0; i < upd.length; i++) {
+                        long tailId = upd[i];
+
+                        long prevId = tailId;
+                        int cnt = 0;
+
+                        while (prevId != 0L) {
+                            final long pageId = prevId;
+                            final long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+                            try {
+                                long pageAddr = readLock(pageId, page);
+
+                                assert pageAddr != 0L;
+
+                                try {
+                                    PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(pageAddr);
+
+                                    cnt += io.getCount(pageAddr);
+                                    prevId = io.getPreviousId(pageAddr);
+
+                                    // In reuse bucket the page itself can be used as a free page.
+                                    if (isReuseBucket(bucket) && prevId != 0L) {
+                                        cnt++;
+                                    }
+                                } finally {
+                                    readUnlock(pageId, page, pageAddr);
+                                }
+                            } finally {
+                                releasePage(pageId, page);
+                            }
+                        }
+
+                        Stripe stripe = new Stripe(tailId, cnt == 0);
+                        tails[i] = stripe;
+                        bucketSize += cnt;
+                    }
+
+                    boolean ok = casBucket(bucket, null, tails);
+
+                    assert ok;
+
+                    bucketsSize.set(bucket, bucketSize);
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns {@code True} if onheap caching is applicable for this pages list, or {@code false} if caching is disabled explicitly by
+     * system property or if page list belongs to in-memory data region (in this case onheap caching makes no sense).
+     */
+    private boolean isCachingApplicable() {
+        return !pagesListCachingDisabled;
+    }
+
+    /**
+     * Save metadata without exclusive lock on it.
+     *
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void saveMetadata(IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        long nextPageId = metaPageId;
+
+        assert nextPageId != 0;
+
+        flushBucketsCache(statHolder);
+
+        if (!changed) {
+            return;
+        }
+
+        // This guaranteed that any concurrently changes of list will be detected.
+        changed = false;
+
+        try {
+            long unusedPageId = writeFreeList(nextPageId);
+
+            markUnusedPagesDirty(unusedPageId);
+        } catch (Throwable e) {
+            changed = true; // Return changed flag due to exception.
+
+            throw e;
+        }
+    }
+
+    /**
+     * Flush onheap cached pages lists to page memory.
+     *
+     * @param statHolder Statistic holder.
+     * @throws IgniteInternalCheckedException If failed to write a page.
+     */
+    private void flushBucketsCache(IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        if (!isCachingApplicable() || !pageCacheChanged) {
+            return;
+        }
+
+        pageCacheChanged = false;
+
+        onheapListCachingEnabled = false;
+
+        int lockedPages = 0;
+
+        try {
+            for (int bucket = 0; bucket < buckets; bucket++) {
+                PagesCache pagesCache = getBucketCache(bucket, false);
+
+                if (pagesCache == null) {
+                    continue;
+                }
+
+                IgniteLongList pages = pagesCache.flush();
+
+                if (pages != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Move pages from heap to PageMemory [list=" + name() + ", bucket=" + bucket
+                                + ", pages=" + pages + ']');
+                    }
+
+                    for (int i = 0; i < pages.size(); i++) {
+                        long pageId = pages.get(i);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Move page from heap to PageMemory [list=" + name() + ", bucket=" + bucket
+                                    + ", pageId=" + pageId + ']');
+                        }
+
+                        Boolean res = write(pageId, putBucket, bucket, null, statHolder);
+
+                        if (res == null) {
+                            // Return page to onheap pages list if can't lock it.
+                            pagesCache.add(pageId);
+
+                            lockedPages++;
+                        }
+                    }
+                }
+            }
+        } finally {
+            onheapListCachingEnabled = true;
+        }
+
+        if (lockedPages != 0) {
+            if (log.isInfoEnabled()) {
+                log.info("Several pages were locked and weren't flushed on disk [grp=" + grpName
+                        + ", lockedPages=" + lockedPages + ']');
+            }
+
+            pageCacheChanged = true;
+        }
+    }
+
+    /**
+     * Write free list data to page memory.
+     *
+     * @param nextPageId First free page id.
+     * @return Unused free list page id.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private long writeFreeList(long nextPageId) throws IgniteInternalCheckedException {
+        long curId = 0L;
+        long curPage = 0L;
+        long curAddr = 0L;
+
+        PagesListMetaIo curIo = null;
+
+        try {
+            for (int bucket = 0; bucket < buckets; bucket++) {
+                Stripe[] tails = getBucket(bucket);
+
+                if (tails != null) {
+                    int tailIdx = 0;
+
+                    while (tailIdx < tails.length) {
+                        int written = curPage != 0L ? curIo.addTails(pageMem.realPageSize(grpId), curAddr, bucket, tails, tailIdx) : 0;
+
+                        if (written == 0) {
+                            if (nextPageId == 0L) {
+                                nextPageId = allocatePageNoReuse();
+
+                                if (curPage != 0L) {
+                                    curIo.setNextMetaPageId(curAddr, nextPageId);
+
+                                    releaseAndWriteUnlock(curId, curPage, curAddr);
+                                }
+
+                                curId = nextPageId;
+                                curPage = acquirePage(curId, IoStatisticsHolderNoOp.INSTANCE);
+                                curAddr = writeLock(curId, curPage);
+
+                                curIo = PagesListMetaIo.VERSIONS.latest();
+
+                                curIo.initNewPage(curAddr, curId, pageSize());
+                            } else {
+                                releaseAndWriteUnlock(curId, curPage, curAddr);
+
+                                curId = nextPageId;
+                                curPage = acquirePage(curId, IoStatisticsHolderNoOp.INSTANCE);
+                                curAddr = writeLock(curId, curPage);
+
+                                curIo = PagesListMetaIo.VERSIONS.forPage(curAddr);
+
+                                curIo.resetCount(curAddr);
+                            }
+
+                            nextPageId = curIo.getNextMetaPageId(curAddr);
+                        } else {
+                            tailIdx += written;
+                        }
+                    }
+                }
+            }
+        } finally {
+            releaseAndWriteUnlock(curId, curPage, curAddr);
+        }
+
+        return nextPageId;
+    }
+
+    /**
+     * Mark unused pages as dirty.
+     *
+     * @param nextPageId First unused page.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void markUnusedPagesDirty(long nextPageId) throws IgniteInternalCheckedException {
+        while (nextPageId != 0L) {
+            long pageId = nextPageId;
+
+            long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+            try {
+                long pageAddr = writeLock(pageId, page);
+
+                try {
+                    PagesListMetaIo io = PagesListMetaIo.VERSIONS.forPage(pageAddr);
+
+                    io.resetCount(pageAddr);
+
+                    nextPageId = io.getNextMetaPageId(pageAddr);
+                } finally {
+                    writeUnlock(pageId, page, pageAddr, true);
+                }
+            } finally {
+                releasePage(pageId, page);
+            }
+        }
+    }
+
+    /**
+     * Releases the page and acquired write lock on it.
+     *
+     * @param pageId Page ID.
+     * @param page Page absolute pointer.
+     * @param pageAddr Page address.
+     */
+    private void releaseAndWriteUnlock(long pageId, long page, long pageAddr) {
+        if (page != 0L) {
+            try {
+                writeUnlock(pageId, page, pageAddr, true);
+            } finally {
+                releasePage(pageId, page);
+            }
+        }
+    }
+
+    /**
+     * Gets bucket index by page freespace.
+     *
+     * @return Bucket index or -1 if page doesn't belong to any bucket.
+     */
+    protected abstract int getBucketIndex(int freeSpace);
+
+    /**
+     * Returns bucket.
+     *
+     * @param bucket Bucket index.
+     */
+    protected abstract @Nullable Stripe[] getBucket(int bucket);
+
+    /**
+     * Sets a bucket through the "compare and set" operation.
+     *
+     * @param bucket Bucket index.
+     * @param exp Expected bucket.
+     * @param upd Updated bucket.
+     * @return {@code true} If succeeded.
+     */
+    protected abstract boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd);
+
+    /**
+     * Returns {@code true} if it is a reuse bucket.
+     *
+     * @param bucket Bucket index.
+     */
+    protected abstract boolean isReuseBucket(int bucket);
+
+    /**
+     * Return bucket cache.
+     *
+     * @param bucket Bucket index.
+     * @param create Create if missing.
+     */
+    protected abstract @Nullable PagesCache getBucketCache(int bucket, boolean create);
+
+    /**
+     * Initializes a new {@link PagesListNodeIo}.
+     *
+     * @param io IO.
+     * @param prevId Previous page ID.
+     * @param prev Previous page buffer.
+     * @param nextId Next page ID.
+     * @param next Next page buffer.
+     */
+    private void setupNextPage(PagesListNodeIo io, long prevId, long prev, long nextId, long next) {
+        assert io.getNextId(prev) == 0L;
+
+        io.initNewPage(next, nextId, pageSize());
+        io.setPreviousId(next, prevId);
+
+        io.setNextId(prev, nextId);
+    }
+
+    /**
+     * Adds stripe to the given bucket.
+     *
+     * @param bucket Bucket.
+     * @param bag Reuse bag.
+     * @param reuse {@code True} if possible to use reuse list.
+     * @return Tail page ID.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private Stripe addStripe(int bucket, ReuseBag bag, boolean reuse) throws IgniteInternalCheckedException {
+        long pageId = allocatePage(bag, reuse);
+
+        init(pageId, PagesListNodeIo.VERSIONS.latest());
+
+        Stripe stripe = new Stripe(pageId, true);
+
+        for (; ; ) {
+            Stripe[] old = getBucket(bucket);
+            Stripe[] upd;
+
+            if (old != null) {
+                int len = old.length;
+
+                upd = Arrays.copyOf(old, len + 1);
+
+                upd[len] = stripe;
+            } else {
+                upd = new Stripe[]{stripe};
+            }
+
+            if (casBucket(bucket, old, upd)) {
+                changed();
+
+                return stripe;
+            }
+        }
+    }
+
+    /**
+     * Stripe tail update.
+     *
+     * @param bucket Bucket index.
+     * @param oldTailId Old tail page ID to replace.
+     * @param newTailId New tail page ID.
+     * @return {@code True} if stripe was removed.
+     */
+    private boolean updateTail(int bucket, long oldTailId, long newTailId) {
+        int idx = -1;
+
+        try {
+            for (; ; ) {
+                Stripe[] tails = getBucket(bucket);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Update tail [list=" + name() + ", bucket=" + bucket + ", oldTailId=" + oldTailId
+                            + ", newTailId=" + newTailId + ", tails=" + Arrays.toString(tails));
+                }
+
+                // Tail must exist to be updated.
+                assert !nullOrEmpty(tails) : "Missing tails [bucket=" + bucket + ", tails=" + Arrays.toString(tails)
+                        + ", metaPage=" + hexLong(metaPageId) + ", grpId=" + grpId + ']';
+
+                idx = findTailIndex(tails, oldTailId, idx);
+
+                assert tails[idx].tailId == oldTailId;
+
+                if (newTailId == 0L) {
+                    if (tails.length <= maxStripesPerBucket / 2) {
+                        tails[idx].empty = true;
+
+                        return false;
+                    }
+
+                    Stripe[] newTails;
+
+                    if (tails.length != 1) {
+                        newTails = remove(tails, idx);
+                    } else {
+                        newTails = null; // Drop the bucket completely.
+                    }
+
+                    if (casBucket(bucket, tails, newTails)) {
+                        // Reset tailId for invalidation of locking when stripe was taken concurrently.
+                        tails[idx].tailId = 0L;
+
+                        return true;
+                    }
+                } else {
+                    // It is safe to assign new tail since we do it only when write lock on tail is held.
+                    tails[idx].tailId = newTailId;
+
+                    return true;
+                }
+            }
+        } finally {
+            changed();
+        }
+    }
+
+    /**
+     * Returns first found index of the given tail ID.
+     *
+     * @param tails Tails.
+     * @param tailId Tail ID to find.
+     * @param expIdx Expected index.
+     * @throws IllegalStateException If tail not found.
+     */
+    private static int findTailIndex(Stripe[] tails, long tailId, int expIdx) {
+        if (expIdx != -1 && tails.length > expIdx && tails[expIdx].tailId == tailId) {
+            return expIdx;
+        }
+
+        for (int i = 0; i < tails.length; i++) {
+            if (tails[i].tailId == tailId) {
+                return i;
+            }
+        }
+
+        throw new IllegalStateException("Tail not found: " + tailId);
+    }
+
+    /**
+     * Returns stripe for put page.
+     *
+     * @param bucket Bucket.
+     * @param bag Reuse bag.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private Stripe getStripeForPut(int bucket, ReuseBag bag) throws IgniteInternalCheckedException {
+        Stripe[] tails = getBucket(bucket);
+
+        if (tails == null) {
+            return addStripe(bucket, bag, true);
+        }
+
+        return randomTail(tails);
+    }
+
+    /**
+     * Returns random tail.
+     *
+     * @param tails Tails.
+     */
+    private static Stripe randomTail(Stripe[] tails) {
+        int len = tails.length;
+
+        assert len != 0;
+
+        return tails[randomInt(len)];
+    }
+
+    /**
+     * !!! For tests only, does not provide any correctness guarantees for concurrent access.
+     *
+     * @param bucket Bucket index.
+     * @return Number of pages stored in this list.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final long storedPagesCount(int bucket) throws IgniteInternalCheckedException {
+        long res = 0;
+
+        Stripe[] tails = getBucket(bucket);
+
+        if (tails != null) {
+            for (Stripe tail : tails) {
+                long tailId = tail.tailId;
+
+                while (tailId != 0L) {
+                    final long pageId = tailId;
+                    final long page = acquirePage(pageId, IoStatisticsHolderNoOp.INSTANCE);
+                    try {
+                        long pageAddr = readLock(pageId, page);
+
+                        assert pageAddr != 0L;
+
+                        try {
+                            PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(pageAddr);
+
+                            int cnt = io.getCount(pageAddr);
+
+                            assert cnt >= 0;
+
+                            res += cnt;
+                            tailId = io.getPreviousId(pageAddr);
+
+                            // In reuse bucket the page itself can be used as a free page.
+                            if (isReuseBucket(bucket) && tailId != 0L) {
+                                res++;
+                            }
+                        } finally {
+                            readUnlock(pageId, page, pageAddr);
+                        }
+                    } finally {
+                        releasePage(pageId, page);
+                    }
+                }
+            }
+        }
+
+        assert res == bucketsSize.get(bucket) : "Wrong bucket size counter [exp=" + res + ", cntr=" + bucketsSize.get(bucket) + ']';
+
+        return res;
+    }
+
+    /**
+     * Puts data page.
+     *
+     * @param bag Reuse bag.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final void put(
+            @Nullable ReuseBag bag,
+            final long dataId,
+            final long dataAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        assert bag == null ^ dataAddr == 0L;
+
+        if (bag != null && bag.isEmpty()) {
+            // Skip allocating stripe for empty bag.
+            return;
+        }
+
+        if (bag == null && onheapListCachingEnabled && putDataPage(getBucketCache(bucket, true), dataId, dataAddr, bucket)) {
+            // Successfully put page to the onheap pages list cache.
+            if (log.isDebugEnabled()) {
+                log.debug("Put page to pages list cache [list=" + name() + ", bucket=" + bucket
+                        + ", dataId=" + dataId + ']');
+            }
+
+            return;
+        }
+
+        for (int lockAttempt = 0; ; ) {
+            Stripe stripe = getStripeForPut(bucket, bag);
+
+            // No need to continue if bag has been utilized at getPageForPut (free page can be used for pagelist).
+            if (bag != null && bag.isEmpty()) {
+                return;
+            }
+
+            final long tailId = stripe.tailId;
+
+            // Stripe was removed from bucket concurrently.
+            if (tailId == 0L) {
+                continue;
+            }
+
+            final long tailPage = acquirePage(tailId, statHolder);
+
+            try {
+                long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++, bag); // Explicit check.
+
+                if (tailAddr == 0L) {
+                    // No need to continue if bag has been utilized at writeLockPage.
+                    if (bag != null && bag.isEmpty()) {
+                        return;
+                    } else {
+                        continue;
+                    }
+                }
+
+                if (stripe.tailId != tailId) {
+                    // Another thread took the last page.
+                    writeUnlock(tailId, tailPage, tailAddr, false);
+
+                    lockAttempt--; // Ignore current attempt.
+
+                    continue;
+                }
+
+                assert getPageId(tailAddr) == tailId
+                        : "tailId = " + hexLong(tailId) + ", pageId = " + hexLong(getPageId(tailAddr));
+                assert getType(tailAddr) == T_PAGE_LIST_NODE
+                        : "tailId = " + hexLong(tailId) + ", type = " + getType(tailAddr);
+
+                boolean ok = false;
+
+                try {
+                    PagesListNodeIo io = pageMem.ioRegistry().resolve(tailAddr);
+
+                    ok = bag != null
+                            // Here we can always take pages from the bag to build our list.
+                            ? putReuseBag(tailId, tailAddr, io, bag, bucket, statHolder) :
+                            // Here we can use the data page to build list only if it is empty and
+                            // it is being put into reuse bucket. Usually this will be true, but there is
+                            // a case when there is no reuse bucket in the free list, but then deadlock
+                            // on node page allocation from separate reuse list is impossible.
+                            // If the data page is not empty it can not be put into reuse bucket and thus
+                            // the deadlock is impossible as well.
+                            putDataPage(tailId, tailAddr, io, dataId, dataAddr, bucket, statHolder);
+
+                    if (ok) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Put page to pages list [list=" + name() + ", bucket=" + bucket
+                                    + ", dataId=" + dataId + ", tailId=" + tailId + ']');
+                        }
+
+                        stripe.empty = false;
+
+                        return;
+                    }
+                } finally {
+                    writeUnlock(tailId, tailPage, tailAddr, ok);
+                }
+            } finally {
+                releasePage(tailId, tailPage);
+            }
+        }
+    }
+
+    /**
+     * Puts data page.
+     *
+     * @param pageId Page ID.
+     * @param pageAddr Page address.
+     * @param io IO.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return {@code true} If succeeded.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private boolean putDataPage(
+            final long pageId,
+            final long pageAddr,
+            PagesListNodeIo io,
+            final long dataId,
+            final long dataAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        if (io.getNextId(pageAddr) != 0L) {
+            return false; // Splitted.
+        }
+
+        int idx = io.addPage(pageAddr, dataId, pageSize());
+
+        if (idx == -1) {
+            handlePageFull(pageId, pageAddr, io, dataId, dataAddr, bucket, statHolder);
+        } else {
+            incrementBucketSize(bucket);
+
+            AbstractDataPageIo dataIo = pageMem.ioRegistry().resolve(dataAddr);
+            dataIo.setFreeListPageId(dataAddr, pageId);
+        }
+
+        return true;
+    }
+
+    /**
+     * Puts data page to page-list cache.
+     *
+     * @param pagesCache Page-list cache onheap.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket.
+     * @return {@code true} If succeeded.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private boolean putDataPage(
+            PagesCache pagesCache,
+            final long dataId,
+            final long dataAddr,
+            int bucket
+    ) throws IgniteInternalCheckedException {
+        if (pagesCache.add(dataId)) {
+            incrementBucketSize(bucket);
+
+            AbstractDataPageIo dataIo = pageMem.ioRegistry().resolve(dataAddr);
+
+            if (dataIo.getFreeListPageId(dataAddr) != 0L) {
+                dataIo.setFreeListPageId(dataAddr, 0L);
+            }
+
+            pageCacheChanged();
+
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Handles the added page.
+     *
+     * @param pageId Page ID.
+     * @param pageAddr Page address.
+     * @param io IO.
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param bucket Bucket index.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void handlePageFull(
+            final long pageId,
+            final long pageAddr,
+            PagesListNodeIo io,
+            final long dataId,
+            final long dataAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        AbstractDataPageIo dataIo = pageMem.ioRegistry().resolve(dataAddr);
+
+        // Attempt to add page failed: the node page is full.
+        if (isReuseBucket(bucket)) {
+            // If we are on the reuse bucket, we can not allocate new page, because it may cause deadlock.
+            assert dataIo.isEmpty(dataAddr); // We can put only empty data pages to reuse bucket.
+
+            // Change page type to index and add it as next node page to this list.
+            long newDataId = changeType(dataId, FLAG_AUX);
+
+            setupNextPage(io, pageId, pageAddr, newDataId, dataAddr);
+
+            // In reuse bucket the page itself can be used as a free page.
+            incrementBucketSize(bucket);
+
+            updateTail(bucket, pageId, newDataId);
+        } else {
+            // Just allocate a new node page and add our data page there.
+            final long nextId = allocatePage(null);
+            final long nextPage = acquirePage(nextId, statHolder);
+
+            try {
+                long nextPageAddr = writeLock(nextId, nextPage); // Newly allocated page.
+
+                assert nextPageAddr != 0L;
+
+                try {
+                    setupNextPage(io, pageId, pageAddr, nextId, nextPageAddr);
+
+                    int idx = io.addPage(nextPageAddr, dataId, pageSize());
+
+                    assert idx != -1;
+
+                    dataIo.setFreeListPageId(dataAddr, nextId);
+
+                    incrementBucketSize(bucket);
+
+                    updateTail(bucket, pageId, nextId);
+                } finally {
+                    writeUnlock(nextId, nextPage, nextPageAddr, true);
+                }
+            } finally {
+                releasePage(nextId, nextPage);
+            }
+        }
+    }
+
+    /**
+     * Puts page to reuse bag.
+     *
+     * @param pageId Page ID.
+     * @param pageAddr Page address.
+     * @param io IO.
+     * @param bag Reuse bag.
+     * @param bucket Bucket.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return {@code true} If succeeded.
+     * @throws IgniteInternalCheckedException if failed.
+     */
+    private boolean putReuseBag(
+            final long pageId,
+            final long pageAddr,
+            PagesListNodeIo io,
+            ReuseBag bag,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        assert bag != null : "bag is null";
+        assert !bag.isEmpty() : "bag is empty";
+
+        if (io.getNextId(pageAddr) != 0L) {
+            return false; // Splitted.
+        }
+
+        long nextId;
+
+        long prevId = pageId;
+        long prevAddr = pageAddr;
+
+        IgniteLongList locked = null; // TODO may be unlock right away and do not keep all these pages locked?
+
+        try {
+            while ((nextId = bag.pollFreePage()) != 0L) {
+                assert itemId(nextId) > 0 && itemId(nextId) <= MAX_ITEMID_NUM : hexLong(nextId);
+
+                int idx = io.addPage(prevAddr, nextId, pageSize());
+
+                if (idx == -1) { // Attempt to add page failed: the node page is full.
+                    final long nextPage = acquirePage(nextId, statHolder);
+
+                    try {
+                        long nextPageAddr = writeLock(nextId, nextPage); // Page from reuse bag can't be concurrently recycled.
+
+                        assert nextPageAddr != 0L;
+
+                        if (locked == null) {
+                            locked = new IgniteLongList(6);
+                        }
+
+                        locked.add(nextId);
+                        locked.add(nextPage);
+                        locked.add(nextPageAddr);
+
+                        setupNextPage(io, prevId, prevAddr, nextId, nextPageAddr);
+
+                        // In reuse bucket the page itself can be used as a free page.
+                        if (isReuseBucket(bucket)) {
+                            incrementBucketSize(bucket);
+                        }
+
+                        // Switch to this new page, which is now a part of our list
+                        // to add the rest of the bag to the new page.
+                        prevAddr = nextPageAddr;
+                        prevId = nextId;
+                    } finally {
+                        releasePage(nextId, nextPage);
+                    }
+                } else {
+                    incrementBucketSize(bucket);
+                }
+            }
+        } finally {
+            if (locked != null) {
+                // We have to update our bucket with the new tail.
+                updateTail(bucket, pageId, prevId);
+
+                // Release write.
+                for (int i = 0; i < locked.size(); i += 3) {
+                    writeUnlock(locked.get(i), locked.get(i + 1), locked.get(i + 2), true);
+                }
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns stripe for take.
+     *
+     * @param bucket Bucket index.
+     */
+    private @Nullable Stripe getStripeForTake(int bucket) {
+        Stripe[] tails = getBucket(bucket);
+
+        if (tails == null || bucketsSize.get(bucket) == 0) {
+            return null;
+        }
+
+        int len = tails.length;
+
+        int init = randomInt(len);
+        int cur = init;
+
+        while (true) {
+            Stripe stripe = tails[cur];
+
+            if (!stripe.empty) {
+                return stripe;
+            }
+
+            if ((cur = (cur + 1) % len) == init) {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Returns page address if page is locked of {@code 0} if it can retry lock.
+     *
+     * @param pageId Page ID.
+     * @param page Page pointer.
+     * @param bucket Bucket.
+     * @param lockAttempt Lock attempts counter.
+     * @param bag Reuse bag.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private long writeLockPage(long pageId, long page, int bucket, int lockAttempt, ReuseBag bag) throws IgniteInternalCheckedException {
+        long pageAddr = tryWriteLock(pageId, page);
+
+        if (pageAddr != 0L) {
+            return pageAddr;
+        }
+
+        if (lockAttempt == tryLockAttempts) {
+            Stripe[] stripes = getBucket(bucket);
+
+            if (stripes == null || stripes.length < maxStripesPerBucket) {
+                addStripe(bucket, bag, !isReuseBucket(bucket));
+
+                return 0L;
+            }
+        }
+
+        return lockAttempt < tryLockAttempts ? 0L : writeLock(pageId, page); // Must be explicitly checked further.
+    }
+
+    /**
+     * Takes empty page from free list.
+     *
+     * @param bucket Bucket index.
+     * @param initIoVers Optional IO to initialize page.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return Removed page ID.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected long takeEmptyPage(
+            int bucket,
+            @Nullable IoVersions<?> initIoVers,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        PagesCache pagesCache = getBucketCache(bucket, false);
+
+        long pageId;
+
+        if (pagesCache != null && (pageId = pagesCache.poll()) != 0L) {
+            decrementBucketSize(bucket);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Take page from pages list cache [list=" + name() + ", bucket=" + bucket
+                        + ", pageId=" + pageId + ']');
+            }
+
+            assert !isReuseBucket(bucket) : "reuse bucket detected";
+
+            return pageId;
+        }
+
+        for (int lockAttempt = 0; ; ) {
+            Stripe stripe = getStripeForTake(bucket);
+
+            if (stripe == null) {
+                return 0L;
+            }
+
+            final long tailId = stripe.tailId;
+
+            // Stripe was removed from bucket concurrently.
+            if (tailId == 0L) {
+                continue;
+            }
+
+            final long tailPage = acquirePage(tailId, statHolder);
+
+            try {
+                long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++, null); // Explicit check.
+
+                if (tailAddr == 0L) {
+                    continue;
+                }
+
+                if (stripe.empty || stripe.tailId != tailId) {
+                    // Another thread took the last page.
+                    writeUnlock(tailId, tailPage, tailAddr, false);
+
+                    if (bucketsSize.get(bucket) > 0) {
+                        lockAttempt--; // Ignore current attempt.
+
+                        continue;
+                    } else {
+                        return 0L;
+                    }
+                }
+
+                assert getPageId(tailAddr) == tailId
+                        : "tailId = " + hexLong(tailId) + ", pageId = " + hexLong(getPageId(tailAddr));
+                assert getType(tailAddr) == T_PAGE_LIST_NODE
+                        : "tailId = " + hexLong(tailId) + ", type = " + getType(tailAddr);
+
+                boolean dirty = false;
+                long dataPageId;
+                long recycleId = 0L;
+
+                try {
+                    PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(tailAddr);
+
+                    if (io.getNextId(tailAddr) != 0) {
+                        // It is not a tail anymore, retry.
+                        continue;
+                    }
+
+                    pageId = io.takeAnyPage(tailAddr);
+
+                    if (pageId != 0L) {
+                        decrementBucketSize(bucket);
+
+                        dirty = true;
+
+                        if (isReuseBucket(bucket) && !(itemId(pageId) > 0 && itemId(pageId) <= MAX_ITEMID_NUM)) {
+                            throw corruptedFreeListException("Incorrectly recycled pageId in reuse bucket: " + hexLong(pageId), pageId);
+                        }
+
+                        if (isReuseBucket(bucket)) {
+                            byte flag = getFlag(initIoVers);
+
+                            PageIo initIo = initIoVers == null ? null : initIoVers.latest();
+
+                            dataPageId = initRecycledPage0(pageId, flag, initIo);
+                        } else {
+                            dataPageId = pageId;
+                        }
+
+                        if (io.isEmpty(tailAddr)) {
+                            long prevId = io.getPreviousId(tailAddr);
+
+                            // If we got an empty page in non-reuse bucket, move it back to reuse list
+                            // to prevent empty page leak to data pages.
+                            if (!isReuseBucket(bucket)) {
+                                if (prevId != 0L) {
+                                    Boolean ok = write(prevId, cutTail, null, bucket, FALSE, statHolder);
+
+                                    assert ok == TRUE : ok;
+
+                                    recycleId = recyclePage(tailId, tailAddr);
+                                } else {
+                                    stripe.empty = true;
+                                }
+                            } else {
+                                stripe.empty = prevId == 0L;
+                            }
+                        }
+                    } else {
+                        // The tail page is empty, but stripe is not. It might
+                        // happen only if we are in reuse bucket and it has
+                        // a previous page, so, the current page can be collected
+                        assert isReuseBucket(bucket);
+
+                        long prevId = io.getPreviousId(tailAddr);
+
+                        assert prevId != 0L;
+
+                        Boolean ok = write(prevId, cutTail, bucket, FALSE, statHolder);
+
+                        assert ok == TRUE : ok;
+
+                        decrementBucketSize(bucket);
+
+                        byte flag = getFlag(initIoVers);
+
+                        PageIo pageIo = initIoVers != null ? initIoVers.latest() : null;
+
+                        dataPageId = initReusedPage(tailId, tailAddr, partitionId(tailId), flag, pageIo);
+
+                        dirty = true;
+                    }
+
+                    // If we do not have a previous page (we are at head), then we still can return
+                    // current page but we have to drop the whole stripe. Since it is a reuse bucket,
+                    // we will not do that, but just return 0L, because this may produce contention on
+                    // meta page.
+                } finally {
+                    writeUnlock(tailId, tailPage, tailAddr, dirty);
+                }
+
+                // Put recycled page (if any) to the reuse bucket after tail is unlocked.
+                if (recycleId != 0L) {
+                    assert !isReuseBucket(bucket);
+
+                    reuseList.addForRecycle(new SingletonReuseBag(recycleId));
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Take page from pages list [list=" + name() + ", bucket=" + bucket
+                            + ", dataPageId=" + dataPageId + ", tailId=" + tailId + ']');
+                }
+
+                return dataPageId;
+            } finally {
+                releasePage(tailId, tailPage);
+            }
+        }
+    }
+
+    /**
+     * Returns {@link PageIdAllocator#FLAG_DATA} for group metas and data pages, {@link #defaultPageFlag} otherwise.
+     *
+     * @param initIoVers Optional IO versions list that will be used later to init the page.
+     */
+    private byte getFlag(@Nullable IoVersions<?> initIoVers) {
+        if (initIoVers != null) {
+            PageIo pageIo = initIoVers.latest();
+
+            switch (pageIo.getType()) {
+                case T_META:
+                case T_DATA:
+                case T_DATA_PART:
+                    return FLAG_DATA;
+                default:
+                    return defaultPageFlag;
+            }
+        }
+
+        return defaultPageFlag;
+    }
+
+    /**
+     * Create new page id and update page content accordingly if it's necessary.
+     *
+     * @param pageId Id of the recycled page from reuse bucket.
+     * @param flag New flag for the page.
+     * @return New page id.
+     * @throws IgniteInternalCheckedException If failed.
+     * @see PagesList#initReusedPage(long, long, int, byte, PageIo)
+     */
+    protected long initRecycledPage0(long pageId, byte flag, PageIo initIo) throws IgniteInternalCheckedException {
+        long page = pageMem.acquirePage(grpId, pageId);
+
+        try {
+            long pageAddr = pageMem.writeLock(grpId, pageId, page);
+
+            try {
+                return initReusedPage(pageId, pageAddr, partitionId(pageId), flag, initIo);
+            } finally {
+                pageMem.writeUnlock(grpId, pageId, page, true);
+            }
+        } finally {
+            pageMem.releasePage(grpId, pageId, page);
+        }
+    }
+
+    /**
+     * Reused page must obtain correctly assaembled page id, then initialized by proper {@link PageIo} instance and non-zero {@code itemId}
+     * of reused page id must be saved into special place.
+     *
+     * @param reusedPageId Reused page id.
+     * @param reusedPageAddr Reused page address.
+     * @param partId Partition id.
+     * @param flag Flag.
+     * @param initIo Initial io.
+     * @return Prepared page id.
+     */
+    protected final long initReusedPage(
+            long reusedPageId,
+            long reusedPageAddr,
+            int partId,
+            byte flag,
+            PageIo initIo
+    ) {
+        long newPageId = pageId(partId, flag, PageIdUtils.pageIndex(reusedPageId));
+
+        if (initIo != null) {
+            initIo.initNewPage(reusedPageAddr, newPageId, pageSize());
+        }
+
+        int itemId = itemId(reusedPageId);
+
+        if (itemId != 0) {
+            if (flag == FLAG_DATA) {
+                PageIo.setRotatedIdPart(reusedPageAddr, itemId);
+            } else {
+                newPageId = PageIdUtils.link(newPageId, itemId);
+            }
+        }
+
+        long storedPageId = getPageId(reusedPageAddr);
+
+        if (storedPageId != newPageId) {
+            PageIo.setPageId(reusedPageAddr, newPageId);
+        }
+
+        return newPageId;
+    }
+
+    /**
+     * Removes data page from bucket, merges bucket list if needed.
+     *
+     * @param dataId Data page ID.
+     * @param dataAddr Data page address.
+     * @param dataIo Data page IO.
+     * @param bucket Bucket index.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return {@code True} if page was removed.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    protected final boolean removeDataPage(
+            final long dataId,
+            final long dataAddr,
+            AbstractDataPageIo dataIo,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        final long pageId = dataIo.getFreeListPageId(dataAddr);
+
+        if (pageId == 0L) { // Page cached in onheap list.
+            assert isCachingApplicable() : "pageId==0L, but caching is not applicable for this pages list: " + name();
+
+            PagesCache pagesCache = getBucketCache(bucket, false);
+
+            // Pages cache can be null here if page was taken for put from free list concurrently.
+            if (pagesCache == null || !pagesCache.removePage(dataId)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Remove page from pages list cache failed [list=" + name() + ", bucket=" + bucket
+                            + ", dataId=" + dataId + "]: " + ((pagesCache == null) ? "cache is null" : "page not found"));
+                }
+
+                return false;
+            }
+
+            decrementBucketSize(bucket);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Remove page from pages list cache [list=" + name() + ", bucket=" + bucket
+                        + ", dataId=" + dataId + ']');
+            }
+
+            return true;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Remove page from pages list [list=" + name() + ", bucket=" + bucket + ", dataId=" + dataId
+                    + ", pageId=" + pageId + ']');
+        }
+
+        final long page = acquirePage(pageId, statHolder);
+
+        try {
+            long nextId;
+
+            long recycleId = 0L;
+
+            long pageAddr = writeLock(pageId, page); // Explicit check.
+
+            if (pageAddr == 0L) {
+                return false;
+            }
+
+            boolean rmvd = false;
+
+            try {
+                PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(pageAddr);
+
+                rmvd = io.removePage(pageAddr, dataId);
+
+                if (!rmvd) {
+                    return false;
+                }
+
+                decrementBucketSize(bucket);
+
+                // Reset free list page ID.
+                dataIo.setFreeListPageId(dataAddr, 0L);
+
+                if (!io.isEmpty(pageAddr)) {
+                    return true; // In optimistic case we still have something in the page and can leave it as is.
+                }
+
+                // If the page is empty, we have to try to drop it and link next and previous with each other.
+                nextId = io.getNextId(pageAddr);
+
+                // If there are no next page, then we can try to merge without releasing current write lock,
+                // because if we will need to lock previous page, the locking order will be already correct.
+                if (nextId == 0L) {
+                    long prevId = io.getPreviousId(pageAddr);
+
+                    recycleId = mergeNoNext(pageId, pageAddr, prevId, bucket, statHolder);
+                }
+            } finally {
+                writeUnlock(pageId, page, pageAddr, rmvd);
+            }
+
+            // Perform a fair merge after lock release (to have a correct locking order).
+            if (nextId != 0L) {
+                recycleId = merge(pageId, page, nextId, bucket, statHolder);
+            }
+
+            if (recycleId != 0L) {
+                reuseList.addForRecycle(new SingletonReuseBag(recycleId));
+            }
+
+            return true;
+        } finally {
+            releasePage(pageId, page);
+        }
+    }
+
+    private long mergeNoNext(
+            long pageId,
+            long pageAddr,
+            long prevId,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        // If we do not have a next page (we are tail) and we are on reuse bucket,
+        // then we can leave as is as well, because it is normal to have an empty tail page here.
+        if (isReuseBucket(bucket)) {
+            return 0L;
+        }
+
+        if (prevId != 0L) { // Cut tail if we have a previous page.
+            Boolean ok = write(prevId, cutTail, null, bucket, FALSE, statHolder);
+
+            assert ok == TRUE : ok;
+        } else {
+            // If we don't have a previous, then we are tail page of free list, just drop the stripe.
+            boolean rmvd = updateTail(bucket, pageId, 0L);
+
+            if (!rmvd) {
+                return 0L;
+            }
+        }
+
+        return recyclePage(pageId, pageAddr);
+    }
+
+    private long merge(
+            final long pageId,
+            final long page,
+            long nextId,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        assert nextId != 0; // We should do mergeNoNext then.
+
+        // Lock all the pages in correct order (from next to previous) and do the merge in retry loop.
+        for (; ; ) {
+            final long curId = nextId;
+            final long curPage = curId == 0L ? 0L : acquirePage(curId, statHolder);
+            try {
+                boolean write = false;
+
+                final long curAddr = curPage == 0L ? 0L : writeLock(curId, curPage); // Explicit check.
+                final long pageAddr = writeLock(pageId, page); // Explicit check.
+
+                if (pageAddr == 0L) {
+                    if (curAddr != 0L) {
+                        // Unlock next page if needed.
+                        writeUnlock(curId, curPage, curAddr, false);
+                    }
+
+                    return 0L; // Someone has merged or taken our empty page concurrently. Nothing to do here.
+                }
+
+                try {
+                    PagesListNodeIo io = PagesListNodeIo.VERSIONS.forPage(pageAddr);
+
+                    if (!io.isEmpty(pageAddr)) {
+                        return 0L; // No need to merge anymore.
+                    }
+
+                    // Check if we see a consistent state of the world.
+                    if (io.getNextId(pageAddr) == curId && (curId == 0L) == (curAddr == 0L)) {
+                        long recycleId = doMerge(pageId, pageAddr, io, curId, curAddr, bucket, statHolder);
+
+                        write = true;
+
+                        return recycleId; // Done.
+                    }
+
+                    // Reread next page ID and go for retry.
+                    nextId = io.getNextId(pageAddr);
+                } finally {
+                    if (curAddr != 0L) {
+                        writeUnlock(curId, curPage, curAddr, write);
+                    }
+
+                    writeUnlock(pageId, page, pageAddr, write);
+                }
+            } finally {
+                if (curPage != 0L) {
+                    releasePage(curId, curPage);
+                }
+            }
+        }
+    }
+
+    private long doMerge(
+            long pageId,
+            long pageAddr,
+            PagesListNodeIo io,
+            long nextId,
+            long nextAddr,
+            int bucket,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        long prevId = io.getPreviousId(pageAddr);
+
+        if (nextId == 0L) {
+            return mergeNoNext(pageId, pageAddr, prevId, bucket, statHolder);
+        } else {
+            // No one must be able to merge it while we keep a reference.
+            assert getPageId(nextAddr) == nextId;
+
+            if (prevId == 0L) { // No previous page: we are at head.
+                // These references must be updated at the same time in write locks.
+                assert PagesListNodeIo.VERSIONS.forPage(nextAddr).getPreviousId(nextAddr) == pageId;
+
+                PagesListNodeIo nextIo = PagesListNodeIo.VERSIONS.forPage(nextAddr);
+                nextIo.setPreviousId(nextAddr, 0);
+            } else {
+                // Do a fair merge: link previous and next to each other.
+                fairMerge(prevId, pageId, nextId, nextAddr, statHolder);
+            }
+
+            return recyclePage(pageId, pageAddr);
+        }
+    }
+
+    /**
+     * Link previous and next to each other.
+     *
+     * @param prevId Previous Previous page ID.
+     * @param pageId Page ID.
+     * @param nextId Next page ID.
+     * @param nextAddr Next page address.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void fairMerge(
+            final long prevId,
+            long pageId,
+            long nextId,
+            long nextAddr,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        long prevPage = acquirePage(prevId, statHolder);
+
+        try {
+            final long prevAddr = writeLock(prevId, prevPage); // No check, we keep a reference.
+            assert prevAddr != 0L;
+            try {
+                PagesListNodeIo prevIo = PagesListNodeIo.VERSIONS.forPage(prevAddr);
+                PagesListNodeIo nextIo = PagesListNodeIo.VERSIONS.forPage(nextAddr);
+
+                // These references must be updated at the same time in write locks.
+                assert prevIo.getNextId(prevAddr) == pageId;
+                assert nextIo.getPreviousId(nextAddr) == pageId;
+
+                prevIo.setNextId(prevAddr, nextId);
+
+                nextIo.setPreviousId(nextAddr, prevId);
+            } finally {
+                writeUnlock(prevId, prevPage, prevAddr, true);
+            }
+        } finally {
+            releasePage(prevId, prevPage);
+        }
+    }
+
+    /**
+     * Increments bucket size and updates changed flag.
+     *
+     * @param bucket Bucket number.
+     */
+    private void incrementBucketSize(int bucket) {
+        bucketsSize.incrementAndGet(bucket);
+    }
+
+    /**
+     * Increments bucket size and updates changed flag.
+     *
+     * @param bucket Bucket number.
+     */
+    private void decrementBucketSize(int bucket) {
+        bucketsSize.decrementAndGet(bucket);
+    }
+
+    /**
+     * Mark free list was changed.
+     */
+    private void changed() {
+        // Ok to have a race here, see the field javadoc.
+        if (!changed) {
+            changed = true;
+        }
+    }
+
+    /**
+     * Mark free list page cache was changed.
+     */
+    private void pageCacheChanged() {
+        // Ok to have a race here, see the field javadoc.
+        if (!pageCacheChanged) {
+            pageCacheChanged = true;
+        }
+    }
+
+    /**
+     * Returns buckets count.
+     */
+    public int bucketsCount() {
+        return buckets;
+    }
+
+    /**
+     * Bucket size.
+     *
+     * @param bucket Bucket.
+     */
+    public long bucketSize(int bucket) {
+        return bucketsSize.get(bucket);
+    }
+
+    /**
+     * Returns stripes count.
+     *
+     * @param bucket Bucket.
+     */
+    public int stripesCount(int bucket) {
+        Stripe[] stripes = getBucket(bucket);
+
+        return stripes == null ? 0 : stripes.length;
+    }
+
+    /**
+     * Returns cached pages count.
+     *
+     * @param bucket Bucket.
+     */
+    public int cachedPagesCount(int bucket) {
+        PagesCache pagesCache = getBucketCache(bucket, false);
+
+        return pagesCache == null ? 0 : pagesCache.size();
+    }
+
+    /**
+     * Returns meta page id.
+     */
+    public long metaPageId() {
+        return metaPageId;
+    }
+
+    /**
+     * Returns {@link CorruptedFreeListException} that wraps original error and ids of possibly corrupted pages.
+     *
+     * @param err Error that caused this exception.
+     * @param pageIds Ids of possibly corrupted pages.
+     */
+    protected CorruptedFreeListException corruptedFreeListException(Throwable err, long... pageIds) {
+        return corruptedFreeListException(err.getMessage(), err, pageIds);
+    }
+
+    /**
+     * Returns {@link CorruptedFreeListException} that wraps original error and ids of possibly corrupted pages.
+     *
+     * @param msg Exception message.
+     * @param pageIds Ids of possibly corrupted pages.
+     */
+    protected CorruptedFreeListException corruptedFreeListException(String msg, long... pageIds) {
+        return corruptedFreeListException(msg, null, pageIds);
+    }
+
+    /**
+     * Returns {@link CorruptedFreeListException} that wraps original error and ids of possibly corrupted pages.
+     *
+     * @param msg Exception message.
+     * @param err Error that caused this exception.
+     * @param pageIds Ids of possibly corrupted pages.
+     */
+    protected CorruptedFreeListException corruptedFreeListException(String msg, @Nullable Throwable err, long... pageIds) {
+        return new CorruptedFreeListException(msg, err, grpId, pageIds);
+    }
+
+    /**
+     * Singleton reuse bag.
+     */
+    private static final class SingletonReuseBag implements ReuseBag {
+        /** Page ID. */
+        long pageId;
+
+        /**
+         * Constructor.
+         *
+         * @param pageId Page ID.
+         */
+        SingletonReuseBag(long pageId) {
+            this.pageId = pageId;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void addFreePage(long pageId) {
+            throw new IllegalStateException("Should never be called.");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public long pollFreePage() {
+            long res = pageId;
+
+            pageId = 0L;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isEmpty() {
+            return pageId == 0L;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String toString() {
+            return S.toString(SingletonReuseBag.class, this, "pageId", hexLong(pageId));
+        }
+    }
+
+    /**
+     * Class to store page-list cache onheap.
+     */
+    public static class PagesCache {
+        /** Pages cache maximum size. */
+        public static final String IGNITE_PAGES_LIST_CACHING_MAX_CACHE_SIZE = "IGNITE_PAGES_LIST_CACHING_MAX_CACHE_SIZE";

Review comment:
       Please add TODO to think about all these properties




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org