You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2019/08/29 09:55:06 UTC

[ignite] branch master updated: IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 560f4833 IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364)
560f4833 is described below

commit 560f48332f511edd0bd1efdc1e4d372491ef8779
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Aug 29 12:54:55 2019 +0300

    IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364)
---
 .../processors/cache/GridCacheEntryEx.java         |  39 ++-
 .../processors/cache/GridCacheMapEntry.java        |  35 ++-
 .../cache/IgniteCacheOffheapManager.java           |  25 ++
 .../cache/IgniteCacheOffheapManagerImpl.java       |  61 +++++
 .../dht/preloader/GridDhtPartitionDemander.java    | 224 +++++++---------
 .../cache/persistence/DataRowCacheAware.java       |  68 +++++
 .../cache/persistence/GridCacheOffheapManager.java |  16 ++
 .../IgniteCacheDatabaseSharedManager.java          |  31 +--
 .../processors/cache/persistence/RowStore.java     |  15 +-
 .../persistence/evict/NoOpPageEvictionTracker.java |   5 +
 .../evict/PageAbstractEvictionTracker.java         |  15 ++
 .../persistence/evict/PageEvictionTracker.java     |   7 +
 .../persistence/freelist/AbstractFreeList.java     | 283 +++++++++++++++++----
 .../cache/persistence/freelist/FreeList.java       |  14 +-
 .../processors/cache/GridCacheTestEntryEx.java     |   3 +-
 .../processors/database/CacheFreeListSelfTest.java | 186 ++++++++++++--
 16 files changed, 785 insertions(+), 242 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 9aec399..71702ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -792,6 +792,42 @@ public interface GridCacheEntryEx {
      * @throws IgniteCheckedException In case of error.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
+    default boolean initialValue(CacheObject val,
+        GridCacheVersion ver,
+        @Nullable MvccVersion mvccVer,
+        @Nullable MvccVersion newMvccVer,
+        byte mvccTxState,
+        byte newMvccTxState,
+        long ttl,
+        long expireTime,
+        boolean preload,
+        AffinityTopologyVersion topVer,
+        GridDrType drType,
+        boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        return initialValue(val, ver, null, null, TxState.NA, TxState.NA,
+            ttl, expireTime, preload, topVer, drType, fromStore, null);
+    }
+
+    /**
+     * Sets new value if current version is <tt>0</tt>
+     *
+     * @param val New value.
+     * @param ver Version to use.
+     * @param mvccVer Mvcc version.
+     * @param newMvccVer New mvcc version.
+     * @param mvccTxState Tx state hint for mvcc version.
+     * @param newMvccTxState Tx state hint for new mvcc version.
+     * @param ttl Time to live.
+     * @param expireTime Expiration time.
+     * @param preload Flag indicating whether entry is being preloaded.
+     * @param topVer Topology version.
+     * @param drType DR type.
+     * @param fromStore {@code True} if value was loaded from store.
+     * @param row Pre-created data row, associated with this cache entry.
+     * @return {@code True} if initial value was set.
+     * @throws IgniteCheckedException In case of error.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     */
     public boolean initialValue(CacheObject val,
         GridCacheVersion ver,
         @Nullable MvccVersion mvccVer,
@@ -803,7 +839,8 @@ public interface GridCacheEntryEx {
         boolean preload,
         AffinityTopologyVersion topVer,
         GridDrType drType,
-        boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
+        boolean fromStore,
+        @Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * Create versioned entry for this cache entry.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 194c97a..725c01a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3310,7 +3310,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean preload,
         AffinityTopologyVersion topVer,
         GridDrType drType,
-        boolean fromStore
+        boolean fromStore,
+        CacheDataRow row
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         ensureFreeSpace();
 
@@ -3386,7 +3387,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer);
                     }
                     else
-                        storeValue(val, expTime, ver);
+                        storeValue(val, expTime, ver, null, row);
                 }
             }
             else {
@@ -3417,7 +3418,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
                 else
                     // Optimization to access storage only once.
-                    update = storeValue(val, expTime, ver, p);
+                    update = storeValue(val, expTime, ver, p, row);
             }
 
             if (update) {
@@ -4257,7 +4258,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     protected boolean storeValue(@Nullable CacheObject val,
         long expireTime,
         GridCacheVersion ver) throws IgniteCheckedException {
-        return storeValue(val, expireTime, ver, null);
+        return storeValue(val, expireTime, ver, null, null);
     }
 
     /**
@@ -4267,6 +4268,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param expireTime Expire time.
      * @param ver New entry version.
      * @param predicate Optional predicate.
+     * @param row Pre-created data row, associated with this cache entry.
      * @return {@code True} if storage was modified.
      * @throws IgniteCheckedException If update failed.
      */
@@ -4274,10 +4276,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable CacheObject val,
         long expireTime,
         GridCacheVersion ver,
-        @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
+        @Nullable IgnitePredicate<CacheDataRow> predicate,
+        @Nullable CacheDataRow row
+    ) throws IgniteCheckedException {
         assert lock.isHeldByCurrentThread();
 
-        UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
+        UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row);
 
         cctx.offheap().invoke(cctx, key, localPartition(), closure);
 
@@ -5730,12 +5734,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
          * @param predicate Optional predicate.
          */
         UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
-            @Nullable IgnitePredicate<CacheDataRow> predicate) {
+            @Nullable IgnitePredicate<CacheDataRow> predicate, @Nullable CacheDataRow newRow) {
             this.entry = entry;
             this.val = val;
             this.ver = ver;
             this.expireTime = expireTime;
             this.predicate = predicate;
+            this.newRow = newRow;
         }
 
         /** {@inheritDoc} */
@@ -5755,13 +5760,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (val != null) {
-                newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
-                    entry.cctx,
-                    entry.key,
-                    val,
-                    ver,
-                    expireTime,
-                    oldRow);
+                if (newRow == null) {
+                    newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+                        entry.cctx,
+                        entry.key,
+                        val,
+                        ver,
+                        expireTime,
+                        oldRow);
+                }
 
                 treeOp = oldRow != null && oldRow.link() == newRow.link() ?
                     IgniteTree.OperationType.IN_PLACE : IgniteTree.OperationType.PUT;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index ab8d338..e73ad52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.cache.Cache;
@@ -29,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
@@ -504,6 +508,17 @@ public interface IgniteCacheOffheapManager {
         throws IgniteCheckedException;
 
     /**
+     * Store entries.
+     *
+     * @param partId Partition number.
+     * @param infos Entry infos.
+     * @param initPred Applied to all created rows. Each row that not matches the predicate is removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos,
+        IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException;
+
+    /**
      * Clears offheap entries.
      *
      * @param cctx Cache context.
@@ -695,6 +710,16 @@ public interface IgniteCacheOffheapManager {
             @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
 
         /**
+         * Insert rows into page memory.
+         *
+         * @param rows Rows.
+         * @param initPred Applied to all rows. Each row that not matches the predicate is removed.
+         * @throws IgniteCheckedException If failed.
+         */
+        public void insertRows(Collection<DataRowCacheAware> rows,
+            IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException;
+
+        /**
          * @param cctx Cache context.
          * @param cleanupRows Rows to cleanup.
          * @throws IgniteCheckedException If failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 57cca44..0d9b9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
@@ -110,6 +112,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -122,6 +125,7 @@ import org.jetbrains.annotations.Nullable;
 import static java.lang.Boolean.TRUE;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.INITIAL_VERSION;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
@@ -145,6 +149,9 @@ import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
  *
  */
 public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
+    /** The maximum number of entries that can be preloaded under checkpoint read lock. */
+    public static final int PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK = 100;
+
     /** */
     private final boolean failNodeOnPartitionInconsistency = Boolean.getBoolean(
         IgniteSystemProperties.IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY);
@@ -1207,6 +1214,37 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
+    @Override public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos,
+        IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
+        CacheDataStore dataStore = dataStore(partId);
+
+        List<DataRowCacheAware> batch = new ArrayList<>(PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK);
+
+        while (infos.hasNext()) {
+            GridCacheEntryInfo info = infos.next();
+
+            assert info.ttl() == TTL_ETERNAL : info.ttl();
+
+            batch.add(new DataRowCacheAware(info.key(),
+                info.value(),
+                info.version(),
+                partId,
+                info.expireTime(),
+                info.cacheId(),
+                grp.storeCacheIdInDataPage()));
+
+            if (batch.size() == PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK) {
+                dataStore.insertRows(batch, initPred);
+
+                batch.clear();
+            }
+        }
+
+        if (!batch.isEmpty())
+            dataStore.insertRows(batch, initPred);
+    }
+
+    /** {@inheritDoc} */
     @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException {
         CacheDataStore dataStore;
 
@@ -1713,6 +1751,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             return dataRow;
         }
 
+        /** {@inheritDoc} */
+        @Override public void insertRows(Collection<DataRowCacheAware> rows,
+            IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+            try {
+                rowStore.addRows(F.view(rows, row -> row.value() != null), grp.statisticsHolderData());
+
+                boolean cacheIdAwareGrp = grp.sharedGroup() || grp.storeCacheIdInDataPage();
+
+                for (DataRowCacheAware row : rows) {
+                    row.storeCacheId(cacheIdAwareGrp);
+
+                    if (!initPred.apply(row) && row.value() != null)
+                        rowStore.removeRow(row.link(), grp.statisticsHolderData());
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
         /**
          * @param key Cache key.
          * @param val Cache value.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9386193..1b471f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -56,15 +56,15 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -80,6 +80,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOAD
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
+import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@@ -752,10 +754,16 @@ public class GridDhtPartitionDemander {
                             try {
                                 Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator();
 
-                                if (grp.mvccEnabled())
-                                    mvccPreloadEntries(topVer, node, p, infos);
-                                else
-                                    preloadEntries(topVer, node, p, infos);
+                                try {
+                                    if (grp.mvccEnabled())
+                                        mvccPreloadEntries(topVer, node, p, infos);
+                                    else
+                                        preloadEntries(topVer, p, infos);
+                                }
+                                catch (GridDhtInvalidPartitionException ignored) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+                                }
 
                                 fut.processed.get(p).increment();
 
@@ -890,7 +898,7 @@ public class GridDhtPartitionDemander {
             ctx.database().checkpointReadLock();
 
             try {
-                for (int i = 0; i < 100; i++) {
+                for (int i = 0; i < PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK; i++) {
                     boolean hasMore = infos.hasNext();
 
                     assert hasMore || !entryHist.isEmpty();
@@ -922,20 +930,9 @@ public class GridDhtPartitionDemander {
                         }
 
                         if (cctx != null) {
-                            if (!mvccPreloadEntry(cctx, node, entryHist, topVer, p)) {
-                                if (log.isTraceEnabled())
-                                    log.trace("Got entries for invalid partition during " +
-                                        "preloading (will skip) [p=" + p +
-                                        ", entry=" + entryHist.get(entryHist.size() - 1) + ']');
+                            mvccPreloadEntry(cctx, node, entryHist, topVer, p);
 
-                                return; // Skip current partition.
-                            }
-
-                            //TODO: IGNITE-11330: Update metrics for touched cache only.
-                            for (GridCacheContext ctx : grp.caches()) {
-                                if (ctx.statisticsEnabled())
-                                    ctx.cache().metrics0().onRebalanceKeyReceived();
-                            }
+                            updateGroupMetrics();
                         }
 
                         if (!hasMore)
@@ -954,140 +951,100 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * Adds entries with theirs history to partition p.
+     * Adds entries to partition p.
      *
-     * @param node Node which sent entry.
+     * @param topVer Topology version.
      * @param p Partition id.
      * @param infos Entries info for preload.
-     * @param topVer Topology version.
-     * @throws IgniteInterruptedCheckedException If interrupted.
+     * @throws IgniteCheckedException If failed.
      */
-    private void preloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p,
+    private void preloadEntries(AffinityTopologyVersion topVer, int p,
         Iterator<GridCacheEntryInfo> infos) throws IgniteCheckedException {
-        GridCacheContext cctx = null;
-
-        // Loop through all received entries and try to preload them.
-        while (infos.hasNext()) {
-            ctx.database().checkpointReadLock();
-
-            try {
-                for (int i = 0; i < 100; i++) {
-                    if (!infos.hasNext())
-                        break;
-
-                    GridCacheEntryInfo entry = infos.next();
-
-                    if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) {
-                        cctx = grp.sharedGroup() ? grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext();
-
-                        if (cctx == null)
-                            continue;
-                        else if (cctx.isNear())
-                            cctx = cctx.dhtCache().context();
-                    }
 
-                    if (!preloadEntry(node, p, entry, topVer, cctx)) {
-                        if (log.isTraceEnabled())
-                            log.trace("Got entries for invalid partition during " +
-                                "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-
-                        return;
-                    }
-
-                    //TODO: IGNITE-11330: Update metrics for touched cache only.
-                    for (GridCacheContext ctx : grp.caches()) {
-                        if (ctx.statisticsEnabled())
-                            ctx.cache().metrics0().onRebalanceKeyReceived();
-                    }
-                }
-            }
-            finally {
-                ctx.database().checkpointReadUnlock();
+        grp.offheap().storeEntries(p, infos, new IgnitePredicateX<CacheDataRow>() {
+            @Override public boolean applyx(CacheDataRow row) throws IgniteCheckedException {
+                return preloadEntry(row, topVer);
             }
-        }
+        });
     }
 
     /**
      * Adds {@code entry} to partition {@code p}.
      *
-     * @param from Node which sent entry.
-     * @param p Partition id.
-     * @param entry Preloaded entry.
+     * @param row Data row.
      * @param topVer Topology version.
-     * @param cctx Cache context.
-     * @return {@code False} if partition has become invalid during preloading.
+     * @return {@code True} if the initial value was set for the specified cache entry.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private boolean preloadEntry(
-        ClusterNode from,
-        int p,
-        GridCacheEntryInfo entry,
-        AffinityTopologyVersion topVer,
-        GridCacheContext cctx
-    ) throws IgniteCheckedException {
+    private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        assert !grp.mvccEnabled();
         assert ctx.database().checkpointLockIsHeldByThread();
 
-        try {
-            GridCacheEntryEx cached = null;
+        updateGroupMetrics();
 
-            try {
-                cached = cctx.cache().entryEx(entry.key(), topVer);
+        GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(row.cacheId()) : grp.singleCacheContext();
 
-                if (log.isTraceEnabled()) {
-                    log.trace("Rebalancing key [key=" + entry.key() + ", part=" + p + ", fromNode=" +
-                        from.id() + ", grpId=" + grp.groupId() + ']');
-                }
+        if (cctx == null)
+            return false;
 
-                if (cached.initialValue(
-                    entry.value(),
-                    entry.version(),
-                    cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccVersion() : null,
-                    cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccVersion() : null,
-                    cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccTxState() : TxState.NA,
-                    cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA,
-                    entry.ttl(),
-                    entry.expireTime(),
-                    true,
-                    topVer,
-                    cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
-                    false
-                )) {
-                    cached.touch(); // Start tracking.
+        cctx = cctx.isNear() ? cctx.dhtCache().context() : cctx;
 
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
-                        cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null,
-                            null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                            false, null, null, null, true);
-                }
-                else {
-                    cached.touch(); // Start tracking.
+        GridCacheEntryEx cached = cctx.cache().entryEx(row.key(), topVer);
 
-                    if (log.isTraceEnabled())
-                        log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
-                            ", part=" + p + ']');
-                }
+        try {
+            if (log.isTraceEnabled()) {
+                log.trace("Rebalancing key [key=" + cached.key() + ", part=" + cached.partition() +
+                    ", grpId=" + grp.groupId() + ']');
             }
-            catch (GridCacheEntryRemovedException ignored) {
-                if (log.isTraceEnabled())
-                    log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
-                        cached.key() + ", part=" + p + ']');
+
+            assert row.expireTime() >= 0 : row.expireTime();
+
+            if (cached.initialValue(
+                row.value(),
+                row.version(),
+                null,
+                null,
+                TxState.NA,
+                TxState.NA,
+                TTL_ETERNAL,
+                row.expireTime(),
+                true,
+                topVer,
+                cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+                false,
+                row
+            )) {
+                cached.touch(); // Start tracking.
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                    cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null,
+                        null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, row.value(), true, null,
+                        false, null, null, null, true);
+
+                return true;
             }
-            catch (GridDhtInvalidPartitionException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+            else {
+                cached.touch(); // Start tracking.
 
-                return false;
+                if (log.isTraceEnabled())
+                    log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                        ", part=" + cached.partition() + ']');
             }
         }
+        catch (GridCacheEntryRemovedException ignored) {
+            if (log.isTraceEnabled())
+                log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                    cached.key() + ", part=" + cached.partition() + ']');
+        }
         catch (IgniteInterruptedCheckedException e) {
             throw e;
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [" +
+                "key=" + row.key() + ", part=" + row.partition() + ']', e);
         }
 
-        return true;
+        return false;
     }
 
     /**
@@ -1098,7 +1055,7 @@ public class GridDhtPartitionDemander {
      * @param history Mvcc entry history.
      * @param topVer Topology version.
      * @param p Partition id.
-     * @return {@code False} if partition has become invalid during preloading.
+     * @return {@code True} if the initial value was set for the specified cache entry.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     private boolean mvccPreloadEntry(
@@ -1131,6 +1088,8 @@ public class GridDhtPartitionDemander {
                         cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null,
                             null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, null, true, null,
                             false, null, null, null, true);
+
+                    return true;
                 }
                 else {
                     cached.touch(); // Start tracking.
@@ -1145,12 +1104,6 @@ public class GridDhtPartitionDemander {
                     log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
                         cached.key() + ", part=" + p + ']');
             }
-            catch (GridDhtInvalidPartitionException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
-                return false;
-            }
         }
         catch (IgniteInterruptedCheckedException | ClusterTopologyCheckedException e) {
             throw e;
@@ -1160,7 +1113,7 @@ public class GridDhtPartitionDemander {
                 ctx.localNode() + ", node=" + from.id() + ", key=" + info.key() + ", part=" + p + ']', e);
         }
 
-        return true;
+        return false;
     }
 
     /**
@@ -1173,6 +1126,19 @@ public class GridDhtPartitionDemander {
         return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier;
     }
 
+    /**
+     * Update rebalancing metrics.
+     */
+    private void updateGroupMetrics() {
+        // TODO: IGNITE-11330: Update metrics for touched cache only.
+        // Due to historical rebalancing "EstimatedRebalancingKeys" metric is currently calculated for the whole cache
+        // group (by partition counters), so "RebalancedKeys" and "RebalancingKeysRate" is calculated in the same way.
+        for (GridCacheContext cctx0 : grp.caches()) {
+            if (cctx0.statisticsEnabled())
+                cctx0.cache().metrics0().onRebalanceKeyReceived();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionDemander.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java
new file mode 100644
index 0000000..baee401
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data row implementation that can optionally hide the cache identifier and can set {@code null} as value.<br> It is
+ * used to simplify storing a data row into page memory, because in some cases the cache identifier is not stored on the
+ * data pages, but is required to link this data row in {@code BPlusTree}.
+ */
+public class DataRowCacheAware extends DataRow {
+    /** Flag indicates that cacheId should be stored in data page. */
+    private boolean storeCacheId;
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     * @param part Partition.
+     * @param expireTime Expire time.
+     * @param cacheId Cache ID.
+     * @param storeCacheId Flag indicates that cacheId should be stored in data page.
+     */
+    public DataRowCacheAware(KeyCacheObject key, @Nullable CacheObject val, GridCacheVersion ver, int part,
+        long expireTime, int cacheId, boolean storeCacheId) {
+        super(key, val, ver, part, expireTime, cacheId);
+
+        storeCacheId(storeCacheId);
+    }
+
+    /**
+     * @param storeCacheId Flag indicates that cacheId should be stored in data page.
+     */
+    public void storeCacheId(boolean storeCacheId) {
+        this.storeCacheId = storeCacheId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int cacheId() {
+        return storeCacheId ? cacheId : CU.UNDEFINED_CACHE_ID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable CacheObject value() {
+        return val;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 0b027f1..bbddc1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgnitePredicateX;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2383,6 +2385,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public void insertRows(Collection<DataRowCacheAware> rows,
+            IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
+            CacheDataStore delegate = init0(false);
+
+            ctx.database().checkpointReadLock();
+
+            try {
+                delegate.insertRows(rows, initPred);
+            } finally {
+                ctx.database().checkpointReadUnlock();
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public int cleanup(GridCacheContext cctx,
             @Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index fb40901..aad30fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -994,35 +994,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
         if (memPlc == null)
             return;
 
-        DataRegionConfiguration plcCfg = memPlc.config();
+        while (memPlc.evictionTracker().evictionRequired()) {
+            warnFirstEvict(memPlc.config());
 
-        if (plcCfg.getPageEvictionMode() == DataPageEvictionMode.DISABLED || plcCfg.isPersistenceEnabled())
-            return;
-
-        long memorySize = plcCfg.getMaxSize();
-
-        PageMemory pageMem = memPlc.pageMemory();
-
-        int sysPageSize = pageMem.systemPageSize();
-
-        CacheFreeList freeList = freeListMap.get(plcCfg.getName());
-
-        for (;;) {
-            long allocatedPagesCnt = pageMem.loadedPages();
-
-            int emptyDataPagesCnt = freeList.emptyDataPages();
-
-            boolean shouldEvict = allocatedPagesCnt > (memorySize / sysPageSize * plcCfg.getEvictionThreshold()) &&
-                emptyDataPagesCnt < plcCfg.getEmptyPagesPoolSize();
-
-            if (shouldEvict) {
-                warnFirstEvict(plcCfg);
-
-                memPlc.evictionTracker().evictDataPage();
+            memPlc.evictionTracker().evictDataPage();
 
-                memPlc.memoryMetrics().updateEvictionRate();
-            } else
-                break;
+            memPlc.memoryMetrics().updateEvictionRate();
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index bdd1c2d..8601d98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
+import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -26,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -118,6 +119,18 @@ public class RowStore {
     }
 
     /**
+     * @param rows Rows.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void addRows(Collection<? extends CacheDataRow> rows,
+        IoStatisticsHolder statHolder) throws IgniteCheckedException {
+        assert ctx.database().checkpointLockIsHeldByThread();
+
+        freeList.insertDataRows(rows, statHolder);
+    }
+
+    /**
      * @param link Row link.
      * @param row New row data.
      * @return {@code True} if was able to update row.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java
index b420ecd..1d0b171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java
@@ -47,4 +47,9 @@ public class NoOpPageEvictionTracker implements PageEvictionTracker {
     @Override public void forgetPage(long pageId) throws IgniteCheckedException {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean evictionRequired() {
+        return false;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java
index 41731b1..703fc6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
@@ -52,6 +53,9 @@ public abstract class PageAbstractEvictionTracker implements PageEvictionTracker
     /** Shared context. */
     private final GridCacheSharedContext sharedCtx;
 
+    /** Data region configuration. */
+    private final DataRegionConfiguration regCfg;
+
     /**
      * @param pageMem Page memory.
      * @param plcCfg Data region configuration.
@@ -66,12 +70,23 @@ public abstract class PageAbstractEvictionTracker implements PageEvictionTracker
 
         this.sharedCtx = sharedCtx;
 
+        regCfg = plcCfg;
+
         trackingSize = pageMem.totalPages();
 
         baseCompactTs = (U.currentTimeMillis() - DAY) >> COMPACT_TS_SHIFT;
         // We subtract day to avoid fail in case of daylight shift or timezone change.
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean evictionRequired() {
+        AbstractFreeList freeList = (AbstractFreeList)sharedCtx.database().freeList(regCfg.getName());
+
+        double pagesThreshold = regCfg.getEvictionThreshold() * regCfg.getMaxSize() / pageMem.systemPageSize();
+
+        return pageMem.loadedPages() > pagesThreshold && freeList.emptyDataPages() < regCfg.getEmptyPagesPoolSize();
+    }
+
     /**
      * @param pageIdx Page index.
      * @return true if at least one data row has been evicted
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java
index baa5462..fa17ea4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java
@@ -33,6 +33,13 @@ public interface PageEvictionTracker extends LifecycleAware {
     public void touchPage(long pageId) throws IgniteCheckedException;
 
     /**
+     * Check if page eviction is required according to the configured policy.
+     *
+     * @return {@code True} if eviction required.
+     */
+    public boolean evictionRequired();
+
+    /**
      * Evicts one data page.
      * In most cases, all entries will be removed from the page.
      * Method guarantees removing at least one entry from "evicted" data page. Removing all entries may be
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
index b677116..cb32219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.freelist;
 
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageUtils;
@@ -40,8 +43,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseB
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
-import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridCursorIteratorWrapper;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -130,13 +133,15 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
         }
     }
 
-    /** */
-    private final PageHandler<T, Integer> writeRow = new WriteRowHandler();
+    /** Write a single row on a single page. */
+    private final WriteRowHandler writeRowHnd = new WriteRowHandler();
 
-    /**
-     *
-     */
-    private final class WriteRowHandler extends PageHandler<T, Integer> {
+    /** Write multiple rows on a single page. */
+    private final WriteRowsHandler writeRowsHnd = new WriteRowsHandler();
+
+    /**  */
+    private class WriteRowHandler extends PageHandler<T, Integer> {
+        /** {@inheritDoc} */
         @Override public Integer run(
             int cacheId,
             long pageId,
@@ -148,6 +153,31 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
             int written,
             IoStatisticsHolder statHolder)
             throws IgniteCheckedException {
+            written = addRow(pageId, page, pageAddr, iox, row, written);
+
+            putPage(((AbstractDataPageIO)iox).getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder);
+
+            return written;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param page Page absolute pointer.
+         * @param pageAddr Page address.
+         * @param iox IO.
+         * @param row Row to write.
+         * @param written Written size.
+         * @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected Integer addRow(
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            T row,
+            int written)
+            throws IgniteCheckedException {
             AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
 
             int rowSize = row.size();
@@ -156,18 +186,9 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
             assert oldFreeSpace > 0 : oldFreeSpace;
 
             // If the full row does not fit into this page write only a fragment.
-            written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) :
+            written = (written == 0 && oldFreeSpace >= rowSize) ? addRowFull(pageId, page, pageAddr, io, row, rowSize) :
                 addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
 
-            // Reread free space after update.
-            int newFreeSpace = io.getFreeSpace(pageAddr);
-
-            if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
-                int bucket = bucket(newFreeSpace, false);
-
-                put(null, pageId, page, pageAddr, bucket, statHolder);
-            }
-
             if (written == rowSize)
                 evictionTracker.touchPage(pageId);
 
@@ -185,7 +206,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
          * @return Written size which is always equal to row size here.
          * @throws IgniteCheckedException If failed.
          */
-        private int addRow(
+        protected int addRowFull(
             long pageId,
             long page,
             long pageAddr,
@@ -225,7 +246,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
          * @return Updated written size.
          * @throws IgniteCheckedException If failed.
          */
-        private int addRowFragment(
+        protected int addRowFragment(
             long pageId,
             long page,
             long pageAddr,
@@ -254,6 +275,66 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
 
             return written + payloadSize;
         }
+
+        /**
+         * Put page into the free list if needed.
+         *
+         * @param freeSpace Page free space.
+         * @param pageId Page ID.
+         * @param page Page pointer.
+         * @param pageAddr Page address.
+         * @param statHolder Statistics holder to track IO operations.
+         */
+        protected void putPage(int freeSpace, long pageId, long page, long pageAddr, IoStatisticsHolder statHolder)
+            throws IgniteCheckedException {
+            if (freeSpace > MIN_PAGE_FREE_SPACE) {
+                int bucket = bucket(freeSpace, false);
+
+                put(null, pageId, page, pageAddr, bucket, statHolder);
+            }
+        }
+    }
+
+    /** */
+    private final class WriteRowsHandler extends PageHandler<GridCursor<T>, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            GridCursor<T> cur,
+            int written,
+            IoStatisticsHolder statHolder)
+            throws IgniteCheckedException {
+            AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+            // Fill the page up to the end.
+            while (written != COMPLETE || (!evictionTracker.evictionRequired() && cur.next())) {
+                T row = cur.get();
+
+                if (written == COMPLETE) {
+                    // If the data row was completely written without remainder, proceed to the next.
+                    if ((written = writeWholePages(row, statHolder)) == COMPLETE)
+                        continue;
+
+                    if (io.getFreeSpace(pageAddr) < row.size() - written)
+                        break;
+                }
+
+                written = writeRowHnd.addRow(pageId, page, pageAddr, io, row, written);
+
+                assert written == COMPLETE;
+
+                evictionTracker.touchPage(pageId);
+            }
+
+            writeRowHnd.putPage(io.getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder);
+
+            return written;
+        }
     }
 
     /** */
@@ -473,8 +554,6 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
 
     /** {@inheritDoc} */
     @Override public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
-        int rowSize = row.size();
-
         int written = 0;
 
         try {
@@ -482,62 +561,168 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
                 if (written != 0)
                     memMetrics.incrementLargeEntriesPages();
 
-                int remaining = rowSize - written;
+                written = writeSinglePage(row, written, statHolder);
+            }
+            while (written != COMPLETE);
+        }
+        catch (IgniteCheckedException | Error e) {
+            throw e;
+        }
+        catch (Throwable t) {
+            throw new CorruptedFreeListException("Failed to insert data row", t);
+        }
+    }
 
-                long pageId = 0L;
+    /**
+     * Reduces the workload on the free list by writing multiple rows into a single memory page at once.<br>
+     * <br>
+     * Rows are sequentially added to the page as long as there is enough free space on it. If the row is large then
+     * those fragments that occupy the whole memory page are written to other pages, and the remainder is added to the
+     * current one.
+     *
+     * @param rows Rows.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Override public void insertDataRows(Collection<T> rows,
+        IoStatisticsHolder statHolder) throws IgniteCheckedException {
+        try {
+            GridCursor<T> cur = new GridCursorIteratorWrapper<>(rows.iterator());
 
-                if (remaining < MIN_SIZE_FOR_DATA_PAGE) {
-                    for (int b = bucket(remaining, false) + 1; b < BUCKETS - 1; b++) {
-                        pageId = takeEmptyPage(b, row.ioVersions(), statHolder);
+            int written = COMPLETE;
 
-                        if (pageId != 0L)
-                            break;
-                    }
+            while (written != COMPLETE || cur.next()) {
+                T row = cur.get();
+
+                // If eviction is required - free up memory before locking the next page.
+                while (evictionTracker.evictionRequired()) {
+                    evictionTracker.evictDataPage();
+
+                    memMetrics.updateEvictionRate();
                 }
 
-                if (pageId == 0L) { // Handle reuse bucket.
-                    if (reuseList == this)
-                        pageId = takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder);
-                    else
-                        pageId = reuseList.takeRecycledPage();
+                if (written == COMPLETE) {
+                    written = writeWholePages(row, statHolder);
+
+                    continue;
                 }
 
                 AbstractDataPageIO initIo = null;
 
+                long pageId = takePage(row.size() - written, row, statHolder);
+
                 if (pageId == 0L) {
                     pageId = allocateDataPage(row.partition());
 
                     initIo = row.ioVersions().latest();
                 }
-                else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket.
-                    pageId = initReusedPage(row, pageId, row.partition(), statHolder);
-                else // Page is taken from free space bucket. For in-memory mode partition must be changed.
-                    pageId = PageIdUtils.changePartitionId(pageId, (row.partition()));
 
-                written = write(pageId, writeRow, initIo, row, written, FAIL_I, statHolder);
+                written = write(pageId, writeRowsHnd, initIo, cur, written, FAIL_I, statHolder);
 
                 assert written != FAIL_I; // We can't fail here.
             }
-            while (written != COMPLETE);
         }
-        catch (IgniteCheckedException | Error e) {
-            throw e;
+        catch (RuntimeException e) {
+            throw new CorruptedFreeListException("Failed to insert data rows", e);
         }
-        catch (Throwable t) {
-            throw new CorruptedFreeListException("Failed to insert data row", t);
+    }
+
+    /**
+     * Write fragments of the row, which occupy the whole memory page. A data row is ignored if it is less than the max
+     * payload of an empty data page.
+     *
+     * @param row Row to process.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return Number of bytes written, {@link #COMPLETE} if the row was fully written, {@code 0} if data row was
+     * ignored because it is less than the max payload of an empty data page.
+     * @throws IgniteCheckedException If failed.
+     */
+    private int writeWholePages(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
+        assert row.link() == 0 : row.link();
+
+        int written = 0;
+        int rowSize = row.size();
+
+        while (rowSize - written >= MIN_SIZE_FOR_DATA_PAGE) {
+            written = writeSinglePage(row, written, statHolder);
+
+            memMetrics.incrementLargeEntriesPages();
         }
+
+        return written;
     }
 
     /**
+     * Take a page and write row on it.
+     *
+     * @param row Row to write.
+     * @param written Written size.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
+     * @throws IgniteCheckedException If failed.
+     */
+    private int writeSinglePage(T row, int written, IoStatisticsHolder statHolder) throws IgniteCheckedException {
+        AbstractDataPageIO initIo = null;
+
+        long pageId = takePage(row.size() - written, row, statHolder);
+
+        if (pageId == 0L) {
+            pageId = allocateDataPage(row.partition());
+
+            initIo = row.ioVersions().latest();
+        }
+
+        written = write(pageId, writeRowHnd, initIo, row, written, FAIL_I, statHolder);
+
+        assert written != FAIL_I; // We can't fail here.
+
+        return written;
+    }
+
+    /**
+     * Take page from free list.
+     *
+     * @param size Required free space on page.
+     * @param row Row to write.
+     * @param statHolder Statistics holder to track IO operations.
+     * @return Page identifier or 0 if no page found in free list.
+     * @throws IgniteCheckedException If failed.
+     */
+    private long takePage(int size, T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
+        long pageId = 0;
+
+        if (size < MIN_SIZE_FOR_DATA_PAGE) {
+            for (int b = bucket(size, false) + 1; b < REUSE_BUCKET; b++) {
+                pageId = takeEmptyPage(b, row.ioVersions(), statHolder);
+
+                if (pageId != 0L)
+                    break;
+            }
+        }
+
+        if (pageId == 0L) { // Handle reuse bucket.
+            pageId = reuseList == this ?
+                takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder) : reuseList.takeRecycledPage();
+        }
+
+        if (pageId == 0L)
+            return 0;
+
+        if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket.
+            return initReusedPage(row, pageId, statHolder);
+        else // Page is taken from free space bucket. For in-memory mode partition must be changed.
+            return PageIdUtils.changePartitionId(pageId, row.partition());
+    }
+
+    /**
+     * @param row Row.
      * @param reusedPageId Reused page id.
-     * @param partId Partition id.
      * @param statHolder Statistics holder to track IO operations.
      * @return Prepared page id.
      *
      * @see PagesList#initReusedPage(long, long, long, int, byte, PageIO)
      */
-    private long initReusedPage(T row, long reusedPageId, int partId,
-        IoStatisticsHolder statHolder) throws IgniteCheckedException {
+    private long initReusedPage(T row, long reusedPageId, IoStatisticsHolder statHolder) throws IgniteCheckedException {
         long reusedPage = acquirePage(reusedPageId, statHolder);
         try {
             long reusedPageAddr = writeLock(reusedPageId, reusedPage);
@@ -546,7 +731,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
 
             try {
                 return initReusedPage(reusedPageId, reusedPage, reusedPageAddr,
-                    partId, PageIdAllocator.FLAG_DATA, row.ioVersions().latest());
+                    row.partition(), PageIdAllocator.FLAG_DATA, row.ioVersions().latest());
             }
             finally {
                 writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
index 28f5a50..5914ae52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
@@ -17,24 +17,34 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.freelist;
 
+import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.processors.cache.persistence.Storable;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
-import org.apache.ignite.internal.metric.IoStatisticsHolder;
 
 /**
  */
 public interface FreeList<T extends Storable> {
     /**
      * @param row Row.
+     * @param statHolder Statistics holder to track IO operations.
      * @throws IgniteCheckedException If failed.
      */
     public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException;
 
     /**
+     * @param rows Rows.
+     * @param statHolder Statistics holder to track IO operations.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void insertDataRows(Collection<T> rows, IoStatisticsHolder statHolder) throws IgniteCheckedException;
+
+    /**
      * @param link Row link.
      * @param row New row data.
+     * @param statHolder Statistics holder to track IO operations.
      * @return {@code True} if was able to update row.
      * @throws IgniteCheckedException If failed.
      */
@@ -46,6 +56,7 @@ public interface FreeList<T extends Storable> {
      * @param arg Handler argument.
      * @param <S> Argument type.
      * @param <R> Result type.
+     * @param statHolder Statistics holder to track IO operations.
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
@@ -54,6 +65,7 @@ public interface FreeList<T extends Storable> {
 
     /**
      * @param link Row link.
+     * @param statHolder Statistics holder to track IO operations.
      * @throws IgniteCheckedException If failed.
      */
     public void removeDataRowByLink(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 8d1ab87..f6900b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -711,7 +711,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean preload,
         AffinityTopologyVersion topVer,
         GridDrType drType,
-        boolean fromStore
+        boolean fromStore,
+        CacheDataRow row
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert false;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
index 6364590..eae81c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.processors.database;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -74,6 +76,9 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
     private static final long MB = 1024L * 1024L;
 
     /** */
+    private static final int BATCH_SIZE = 100;
+
+    /** */
     private PageMemory pageMem;
 
     /** {@inheritDoc} */
@@ -90,6 +95,46 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     @Test
+    public void testInsertDeleteSingleThreaded_batched_1024() throws Exception {
+        checkInsertDeleteSingleThreaded(1024, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteSingleThreaded_batched_2048() throws Exception {
+        checkInsertDeleteSingleThreaded(2048, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteSingleThreaded_batched_4096() throws Exception {
+        checkInsertDeleteSingleThreaded(4096, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteSingleThreaded_batched_8192() throws Exception {
+        checkInsertDeleteSingleThreaded(8192, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteSingleThreaded_batched_16384() throws Exception {
+        checkInsertDeleteSingleThreaded(16384, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
     public void testInsertDeleteSingleThreaded_1024() throws Exception {
         checkInsertDeleteSingleThreaded(1024);
     }
@@ -167,15 +212,64 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteMultiThreaded_batched_1024() throws Exception {
+        checkInsertDeleteMultiThreaded(1024, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteMultiThreaded_batched_2048() throws Exception {
+        checkInsertDeleteMultiThreaded(2048, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteMultiThreaded_batched_4096() throws Exception {
+        checkInsertDeleteMultiThreaded(4096, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteMultiThreaded_batched_8192() throws Exception {
+        checkInsertDeleteMultiThreaded(8192, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testInsertDeleteMultiThreaded_batched_16384() throws Exception {
+        checkInsertDeleteMultiThreaded(16384, true);
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @throws Exception if failed.
+     */
+    protected void checkInsertDeleteMultiThreaded(int pageSize) throws Exception {
+        checkInsertDeleteMultiThreaded(pageSize, false);
+    }
+
+    /**
      * @param pageSize Page size.
+     * @param batched Batch mode flag.
      * @throws Exception If failed.
      */
-    protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Exception {
-        final FreeList list = createFreeList(pageSize);
+    protected void checkInsertDeleteMultiThreaded(final int pageSize, final boolean batched) throws Exception {
+        final FreeList<CacheDataRow> list = createFreeList(pageSize);
 
         Random rnd = new Random();
 
-        final ConcurrentMap<Long, TestDataRow> stored = new ConcurrentHashMap<>();
+        final ConcurrentMap<Long, CacheDataRow> stored = new ConcurrentHashMap<>();
 
         for (int i = 0; i < 100; i++) {
             int keySize = rnd.nextInt(pageSize * 3 / 2) + 10;
@@ -187,7 +281,7 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
 
             assertTrue(row.link() != 0L);
 
-            TestDataRow old = stored.put(row.link(), row);
+            CacheDataRow old = stored.put(row.link(), row);
 
             assertNull(old);
         }
@@ -196,6 +290,8 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.runMultiThreaded(new Callable<Object>() {
             @Override public Object call() throws Exception {
+                List<CacheDataRow> rows = new ArrayList<>(BATCH_SIZE);
+
                 Random rnd = ThreadLocalRandom.current();
 
                 for (int i = 0; i < 200_000; i++) {
@@ -226,25 +322,45 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
 
                         TestDataRow row = new TestDataRow(keySize, valSize);
 
+                        if (batched) {
+                            rows.add(row);
+
+                            if (rows.size() == BATCH_SIZE) {
+                                list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE);
+
+                                for (CacheDataRow row0 : rows) {
+                                    assertTrue(row0.link() != 0L);
+
+                                    CacheDataRow old = stored.put(row0.link(), row0);
+
+                                    assertNull(old);
+                                }
+
+                                rows.clear();
+                            }
+
+                            continue;
+                        }
+
                         list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE);
 
                         assertTrue(row.link() != 0L);
 
-                        TestDataRow old = stored.put(row.link(), row);
+                        CacheDataRow old = stored.put(row.link(), row);
 
                         assertNull(old);
                     }
                     else {
-                        while (true) {
-                            Iterator<TestDataRow> it = stored.values().iterator();
+                        while (!stored.isEmpty()) {
+                            Iterator<CacheDataRow> it = stored.values().iterator();
 
                             if (it.hasNext()) {
-                                TestDataRow row = it.next();
+                                CacheDataRow row = it.next();
 
-                                TestDataRow rmvd = stored.remove(row.link);
+                                CacheDataRow rmvd = stored.remove(row.link());
 
                                 if (rmvd != null) {
-                                    list.removeDataRowByLink(row.link, IoStatisticsHolderNoOp.INSTANCE);
+                                    list.removeDataRowByLink(row.link(), IoStatisticsHolderNoOp.INSTANCE);
 
                                     break;
                                 }
@@ -259,14 +375,24 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param pageSize Page size.
      * @throws Exception if failed.
      */
     protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception {
-        FreeList list = createFreeList(pageSize);
+        checkInsertDeleteSingleThreaded(pageSize, false);
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @param batched Batch mode flag.
+     * @throws Exception if failed.
+     */
+    protected void checkInsertDeleteSingleThreaded(int pageSize, boolean batched) throws Exception {
+        FreeList<CacheDataRow> list = createFreeList(pageSize);
 
         Random rnd = new Random();
 
-        Map<Long, TestDataRow> stored = new HashMap<>();
+        Map<Long, CacheDataRow> stored = new HashMap<>();
 
         for (int i = 0; i < 100; i++) {
             int keySize = rnd.nextInt(pageSize * 3 / 2) + 10;
@@ -278,13 +404,15 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
 
             assertTrue(row.link() != 0L);
 
-            TestDataRow old = stored.put(row.link(), row);
+            CacheDataRow old = stored.put(row.link(), row);
 
             assertNull(old);
         }
 
         boolean grow = true;
 
+        List<CacheDataRow> rows = new ArrayList<>(BATCH_SIZE);
+
         for (int i = 0; i < 1_000_000; i++) {
             if (grow) {
                 if (stored.size() > 20_000) {
@@ -309,25 +437,45 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
 
                 TestDataRow row = new TestDataRow(keySize, valSize);
 
+                if (batched) {
+                    rows.add(row);
+
+                    if (rows.size() == BATCH_SIZE) {
+                        list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE);
+
+                        for (CacheDataRow row0 : rows) {
+                            assertTrue(row0.link() != 0L);
+
+                            CacheDataRow old = stored.put(row0.link(), row0);
+
+                            assertNull(old);
+                        }
+
+                        rows.clear();
+                    }
+
+                    continue;
+                }
+
                 list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE);
 
                 assertTrue(row.link() != 0L);
 
-                TestDataRow old = stored.put(row.link(), row);
+                CacheDataRow old = stored.put(row.link(), row);
 
                 assertNull(old);
             }
             else {
-                Iterator<TestDataRow> it = stored.values().iterator();
+                Iterator<CacheDataRow> it = stored.values().iterator();
 
                 if (it.hasNext()) {
-                    TestDataRow row = it.next();
+                    CacheDataRow row = it.next();
 
-                    TestDataRow rmvd = stored.remove(row.link);
+                    CacheDataRow rmvd = stored.remove(row.link());
 
                     assertTrue(rmvd == row);
 
-                    list.removeDataRowByLink(row.link, IoStatisticsHolderNoOp.INSTANCE);
+                    list.removeDataRowByLink(row.link(), IoStatisticsHolderNoOp.INSTANCE);
                 }
             }
         }
@@ -355,7 +503,7 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest {
      * @return Free list.
      * @throws Exception If failed.
      */
-    protected FreeList createFreeList(int pageSize) throws Exception {
+    protected FreeList<CacheDataRow> createFreeList(int pageSize) throws Exception {
         DataRegionConfiguration plcCfg = new DataRegionConfiguration()
             .setInitialSize(1024 * MB)
             .setMaxSize(1024 * MB);