You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/02/12 10:13:07 UTC

[ignite] branch master updated: IGNITE-14138 Fixed an issue where historical rebalance can kill supplier node. Fixes #8769

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 84c6841  IGNITE-14138 Fixed an issue where historical rebalance can kill supplier node. Fixes #8769
84c6841 is described below

commit 84c684137fe577a151a28ff8d0849e8b98424bce
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Feb 12 13:12:23 2021 +0300

    IGNITE-14138 Fixed an issue where historical rebalance can kill supplier node. Fixes #8769
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   9 ++
 .../dht/preloader/GridDhtPartitionSupplier.java    |   5 +-
 .../GridCacheDatabaseSharedManager.java            |  13 +-
 .../cache/persistence/GridCacheOffheapManager.java |  24 +--
 .../persistence/checkpoint/CheckpointEntry.java    |  17 +-
 .../persistence/checkpoint/CheckpointHistory.java  |  25 ++-
 .../CacheRebalanceWithRemovedWalSegment.java       | 174 +++++++++++++++++++++
 .../db/wal/WalRecoveryTxLogicalRecordsTest.java    |  56 +++++--
 .../ignite/testsuites/IgnitePdsTestSuite4.java     |   2 +
 9 files changed, 282 insertions(+), 43 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 47dcb4a..6daca67 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -34,6 +34,7 @@ import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
 import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
 import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
@@ -1891,6 +1892,14 @@ public final class IgniteSystemProperties {
         "IGNITE_MASTER_KEY_NAME_TO_CHANGE_BEFORE_STARTUP";
 
     /**
+     * Disable group state lazy store. It means that group state won't be cached for {@link CheckpointEntry} and will be
+     * read from wal every time. Should be used for test purposes only.
+     */
+    @SystemProperty(value = "Disable group state lazy store. It means that group state won't be cached " +
+        "and will be read from wal every time", defaults = "false")
+    public static final String IGNITE_DISABLE_GRP_STATE_LAZY_STORE = "IGNITE_DISABLE_GRP_STATE_LAZY_STORE";
+
+    /**
      * Enables extended logging of indexes create/rebuild process. Default {@code false}.
      * <p/>
      * <b>Warning</b>: enabling that option can lead to performance degradation of index creation, rebuilding and  node
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f52bf0e6..05e48c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -618,7 +618,10 @@ public class GridDhtPartitionSupplier {
      * @param demandMsg Demand message.
      */
     private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDemandMessage demandMsg) {
-        return "grp=" + grp.cacheOrGroupName() + ", demander=" + demander + ", topVer=" + demandMsg.topologyVersion() + ", topic=" + topicId;
+        return "grp=" + grp.cacheOrGroupName() +
+            ", demander=" + demander +
+            ", topVer=" + demandMsg.topologyVersion() +
+            (topicId > 0 ? ", topic=" + topicId : "");
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a01ed06..83420eb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1596,10 +1596,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 assert cctx.wal().reserved(cpEntry.checkpointMark())
                     : "WAL segment for checkpoint " + cpEntry + " has not reserved";
 
-                Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
+                try {
+                    Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
 
-                if (updCntr != null)
-                    grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
+                    if (updCntr != null)
+                        grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
+                }
+                catch (IgniteCheckedException ex) {
+                    log.warning("Reservation failed because counters are not available [grpId=" + grpId
+                        + ", part=" + partId
+                        + ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", ex);
+                }
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 7f4af90..376b14b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1171,11 +1171,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
         GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database();
 
-        WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading();
-
-        if (latestReservedPointer == null)
-            throw new IgniteHistoricalIteratorException("Historical iterator wasn't created, because WAL isn't reserved.");
-
         Map<Integer, Long> partsCounters = new HashMap<>();
 
         for (int i = 0; i < partCntrs.size(); i++) {
@@ -1185,15 +1180,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             partsCounters.put(p, initCntr);
         }
 
-        WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(),
-            partsCounters, latestReservedPointer, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L);
+        try {
+            WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(),
+                partsCounters, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L);
+
+            WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading();
 
-        assert latestReservedPointer.compareTo(minPtr) <= 0
-            : "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName()
-            + ", reservedPointer=" + database.latestWalPointerReservedForPreloading()
-            + ", historicalPointer=" + minPtr + ']';
+            assert latestReservedPointer == null || latestReservedPointer.compareTo(minPtr) <= 0
+                : "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName()
+                + ", reservedPointer=" + latestReservedPointer
+                + ", historicalPointer=" + minPtr + ']';
+
+            if (latestReservedPointer == null)
+                log.warning("History for the preloading has not reserved yet.");
 
-        try {
             WALIterator it = grp.shared().wal().replay(minPtr);
 
             WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, partsCounters, it);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
index eef8dac..7dbb507 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
@@ -36,6 +37,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE;
+
 /**
  * Class represents checkpoint state.
  */
@@ -114,7 +117,7 @@ public class CheckpointEntry {
     private GroupStateLazyStore initIfNeeded(IgniteWriteAheadLogManager wal) throws IgniteCheckedException {
         GroupStateLazyStore store = grpStateLazyStore.get();
 
-        if (store == null) {
+        if (store == null || IgniteSystemProperties.getBoolean(IGNITE_DISABLE_GRP_STATE_LAZY_STORE, false)) {
             store = new GroupStateLazyStore();
 
             grpStateLazyStore = new SoftReference<>(store);
@@ -130,16 +133,10 @@ public class CheckpointEntry {
      * @param grpId Cache group ID.
      * @param part Partition ID.
      * @return Partition counter or {@code null} if not found.
+     * @throws IgniteCheckedException If something is wrong when loading the counter from WAL history.
      */
-    public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) {
-        GroupStateLazyStore store;
-
-        try {
-            store = initIfNeeded(wal);
-        }
-        catch (IgniteCheckedException e) {
-            return null;
-        }
+    public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) throws IgniteCheckedException {
+        GroupStateLazyStore store = initIfNeeded(wal);
 
         return store.partitionCounter(grpId, part);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 2720072..784dd77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -439,14 +439,12 @@ public class CheckpointHistory {
      *
      * @param grpId Group id.
      * @param partsCounter Partition mapped to update counter.
-     * @param latestReservedPointer Latest reserved WAL pointer.
      * @param margin Margin pointer.
      * @return Earliest WAL pointer for group specified.
      */
     @Nullable public WALPointer searchEarliestWalPointer(
         int grpId,
         Map<Integer, Long> partsCounter,
-        WALPointer latestReservedPointer,
         long margin
     ) throws IgniteCheckedException {
         if (F.isEmpty(partsCounter))
@@ -467,6 +465,12 @@ public class CheckpointHistory {
 
             WALPointer ptr = cpEntry.checkpointMark();
 
+            if (!wal.reserved(ptr)) {
+                throw new IgniteCheckedException("WAL pointer appropriate to the checkpoint was not reserved " +
+                    "[cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp())
+                    + "), ptr=" + ptr + ']');
+            }
+
             while (iter.hasNext()) {
                 Map.Entry<Integer, Long> entry = iter.next();
 
@@ -494,7 +498,7 @@ public class CheckpointHistory {
                 }
             }
 
-            if ((F.isEmpty(modifiedPartsCounter) && F.isEmpty(historyPointerCandidate)) || ptr.compareTo(latestReservedPointer) == 0)
+            if (F.isEmpty(modifiedPartsCounter))
                 break;
         }
 
@@ -588,7 +592,16 @@ public class CheckpointHistory {
             long margin,
             Map<Integer, Long> partCntsForUpdate
         ) {
-            Long foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(wal, grpId, part);
+            Long foundCntr = null;
+
+            try {
+                foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(wal, grpId, part);
+            }
+            catch (IgniteCheckedException e) {
+                log.warning("Checkpoint cannot be chosen because counter is unavailable [grpId=" + grpId
+                    + ", part=" + part
+                    + ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", e);
+            }
 
             if (foundCntr == null || foundCntr == walPntrCntr) {
                 partCntsForUpdate.put(part, walPntrCntr);
@@ -639,7 +652,9 @@ public class CheckpointHistory {
                 if (F.isEmpty(modifiedSearchMap))
                     return res;
             }
-            catch (IgniteCheckedException ignore) {
+            catch (IgniteCheckedException e) {
+                log.warning("Checkpoint data is unavailable in WAL [cpTs=" + U.format(cpTs) + ']', e);
+
                 break;
             }
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheRebalanceWithRemovedWalSegment.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheRebalanceWithRemovedWalSegment.java
new file mode 100644
index 0000000..7114fed
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheRebalanceWithRemovedWalSegment.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.File;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests a fall back from historical to full rebalance if WAL had been corrupted after it was reserved.
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE, value = "true")
+public class CacheRebalanceWithRemovedWalSegment extends GridCommonAbstractTest {
+    /** Listening logger. */
+    private ListeningTestLogger listeningLog;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setGridLogger(listeningLog)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setWalSegmentSize(512 * 1024)
+                .setWalSegments(2)
+                .setCheckpointFrequency(600_000)
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(200L * 1024 * 1024)
+                    .setPersistenceEnabled(true)))
+            .setConsistentId(igniteInstanceName)
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, 16)));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        listeningLog = new ListeningTestLogger(log);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test() throws Exception {
+        IgniteEx ignite = startGrids(2);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        byte[] testVal = new byte[20 * 1024];
+
+        for (int i = 0; i < 300; i++)
+            cache.put(i, testVal);
+
+        forceCheckpoint();
+
+        ignite(1).close();
+
+        for (int i = 300; i < 500; i++)
+            cache.put(i, testVal);
+
+        forceCheckpoint();
+
+        stopAllGrids();
+
+        ignite = startGridWithBlockedDemandMessages(1, 0);
+        startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        GridDhtPartitionsExchangeFuture exchangeFuture = ignite(0).context().cache().context().exchange().lastTopologyFuture();
+
+        // Waiting for reservation, otherwise we can catch a problem during reservation.
+        exchangeFuture.get();
+
+        TestRecordingCommunicationSpi.spi(ignite).waitForBlocked();
+
+        File walPath = new File(
+            U.resolveWorkDirectory(
+                ignite(0).context().config().getWorkDirectory(),
+                DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
+                false
+            ),
+            ignite(0).context().pdsFolderResolver().resolveFolders().folderName()
+        );
+
+        for (File file : walPath.listFiles()) {
+            if (U.delete(file))
+                info("File deleted " + file);
+            else
+                info("Can't delete file " + file);
+        }
+
+        LogListener lsnr = LogListener.matches("Failed to continue supplying [grp=" + DEFAULT_CACHE_NAME
+            + ", demander=" + ignite.localNode().id()
+            + ", topVer=" + exchangeFuture.topologyVersion() + ']').build();
+
+        listeningLog.registerListener(lsnr);
+
+        TestRecordingCommunicationSpi.spi(ignite).stopBlock();
+
+        awaitPartitionMapExchange();
+
+        assertTrue(lsnr.check());
+
+        assertPartitionsSame(idleVerify(ignite, DEFAULT_CACHE_NAME));
+    }
+
+    /**
+     * Starts a node and blocks demand message sending to other one.
+     *
+     * @param nodeIdx Start node index.
+     * @param demandNodeIdx Demand node index.
+     * @return Started node Ignite instance.
+     * @throws Exception If failed.
+     */
+    private IgniteEx startGridWithBlockedDemandMessages(int nodeIdx, int demandNodeIdx) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(nodeIdx));
+
+        TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        spi.blockMessages(GridDhtPartitionDemandMessage.class, getTestIgniteInstanceName(demandNodeIdx));
+
+        return startGrid(cfg);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
index 9309631..50e4dae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
@@ -59,10 +60,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.PagesList;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -73,8 +76,6 @@ import org.apache.ignite.transactions.Transaction;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointStatus.NULL_PTR;
-
 /**
  *
  */
@@ -372,7 +373,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                 map = new IgniteDhtDemandedPartitionsMap();
                 map.addHistorical(0, i, entries, PARTS);
 
-                GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+                WALPointer ptr = reserveWalPointerForIterator(grp.shared());
 
                 try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
                     assertNotNull(it);
@@ -391,13 +392,13 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                     assertFalse(it.hasNext());
                 }
                 finally {
-                    GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+                    releaseWalPointerForIterator(grp.shared(), ptr);
                 }
 
                 map = new IgniteDhtDemandedPartitionsMap();
                 map.addHistorical(1, i, entries, PARTS);
 
-                GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+                ptr = reserveWalPointerForIterator(grp.shared());
 
                 try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
                     assertNotNull(it);
@@ -416,7 +417,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                     assertFalse(it.hasNext());
                 }
                 finally {
-                    GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+                    releaseWalPointerForIterator(grp.shared(), ptr);
                 }
             }
 
@@ -437,7 +438,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                 map = new IgniteDhtDemandedPartitionsMap();
                 map.addHistorical(0, i, entries, PARTS);
 
-                GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+                WALPointer ptr = reserveWalPointerForIterator(grp.shared());
 
                 try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
                     long end = System.currentTimeMillis();
@@ -466,13 +467,13 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                     assertFalse(it.hasNext());
                 }
                 finally {
-                    GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+                    releaseWalPointerForIterator(grp.shared(), ptr);
                 }
 
                 map = new IgniteDhtDemandedPartitionsMap();
                 map.addHistorical(1, i, entries, PARTS);
 
-                GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+                ptr = reserveWalPointerForIterator(grp.shared());
 
                 try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
                     assertNotNull(it);
@@ -491,7 +492,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                     assertFalse(it.hasNext());
                 }
                 finally {
-                    GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+                    releaseWalPointerForIterator(grp.shared(), ptr);
                 }
             }
         }
@@ -546,6 +547,37 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Reserves a WAL pointer for historical iterator.
+     *
+     * @param cctx Cache shared context.
+     * @return WAL pointer.
+     */
+    private WALPointer reserveWalPointerForIterator(GridCacheSharedContext cctx) {
+        final CheckpointHistory cpHist = ((GridCacheDatabaseSharedManager)cctx.database()).checkpointHistory();
+
+        WALPointer oldestPtr = cpHist.firstCheckpointPointer();
+
+        GridTestUtils.setFieldValue(cctx.database(), "reservedForPreloading", oldestPtr);
+
+        cctx.wal().reserve(oldestPtr);
+
+        return oldestPtr;
+    }
+
+    /**
+     * Releases a WAL pointer for historical iterator.
+     *
+     * @param cctx Cache shared context.
+     * @param ptr WAL pointer to release.
+     * @throws IgniteCheckedException If the release failed.
+     */
+    private void releaseWalPointerForIterator(GridCacheSharedContext cctx, WALPointer ptr) throws IgniteCheckedException {
+        GridTestUtils.setFieldValue(cctx.database(), "reservedForPreloading", null);
+
+        cctx.wal().release(ptr);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     @Test
@@ -983,7 +1015,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
 
         List<CacheDataRow> rows = new ArrayList<>();
 
-        GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", NULL_PTR);
+        WALPointer ptr = reserveWalPointerForIterator(grp.shared());
 
         try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
             assertNotNull(it);
@@ -992,7 +1024,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                 rows.add(it.next());
         }
         finally {
-            GridTestUtils.setFieldValue(grp.shared().database(), "reservedForPreloading", null);
+            releaseWalPointerForIterator(grp.shared(), ptr);
         }
 
         return rows;
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 2c04070..6c417d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
 import org.apache.ignite.cache.ResetLostPartitionTest;
 import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
 import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.CacheRebalanceWithRemovedWalSegment;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.SupplyPartitionHistoricallyWithReorderedUpdates;
 import org.apache.ignite.internal.processors.cache.persistence.CorruptedTreeFailureHandlingTest;
@@ -123,6 +124,7 @@ public class IgnitePdsTestSuite4 {
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsConsistencyOnDelayedPartitionOwning.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, SupplyPartitionHistoricallyWithReorderedUpdates.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheRebalanceWithRemovedWalSegment.class, ignoredTests);
 
         // Warm-up tests.
         GridTestUtils.addTestIfNeeded(suite, WarmUpSelfTest.class, ignoredTests);