You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/07/07 06:47:52 UTC

[ignite] branch master updated: IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f55901d  IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722.
f55901d is described below

commit f55901d3de2148227b102e5f5260bc637617f261
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Jul 7 09:41:32 2020 +0300

    IGNITE-12935 Improved logging for historical rebalancing. - Fixes #7722.
    
    Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
 .../preloader/GridDhtPartitionsExchangeFuture.java |  91 ++++++-
 .../dht/preloader/SupplyPartitionInfo.java         |  99 ++++++++
 .../GridCacheDatabaseSharedManager.java            |  65 ++++-
 .../persistence/checkpoint/CheckpointHistory.java  |  29 ++-
 .../persistence/checkpoint/ReservationReason.java  |  50 ++++
 .../db/wal/IgniteWalRebalanceLoggingTest.java      | 275 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsMvccTestSuite2.java |   2 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 +
 .../processors/client/IgniteDataStreamerTest.java  |   2 +
 9 files changed, 598 insertions(+), 18 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a34a098..21b3d26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3349,8 +3349,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      *
      * @param top Topology to assign.
      * @param resetOwners True if need to reset partition state considering of counter, false otherwise.
+     * @return Partitions supply info list.
      */
-    private void assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) {
+    private List<SupplyPartitionInfo> assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) {
         Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
         Map<Integer, TreeSet<Long>> varCntrs = new HashMap<>();
 
@@ -3424,10 +3425,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         Set<Integer> haveHistory = new HashSet<>();
 
-        assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory);
+        List<SupplyPartitionInfo> list = assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory);
 
         if (resetOwners)
             resetOwnersByCounter(top, maxCntrs, haveHistory);
+
+        return list;
     }
 
     /**
@@ -3470,8 +3473,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param maxCntrs Max counter partiton map.
      * @param varCntrs Various counters for each partition.
      * @param haveHistory Set of partitions witch have historical supplier.
+     * @return List of partitions which does not have historical supplier.
      */
-    private void assignHistoricalSuppliers(
+    private List<SupplyPartitionInfo> assignHistoricalSuppliers(
         GridDhtPartitionTopology top,
         Map<Integer, CounterWithNodes> maxCntrs,
         Map<Integer, TreeSet<Long>> varCntrs,
@@ -3481,6 +3485,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
 
+        List<SupplyPartitionInfo> list = new ArrayList<>();
+
         for (Map.Entry<Integer, TreeSet<Long>> e : varCntrs.entrySet()) {
             int p = e.getKey();
 
@@ -3529,7 +3535,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         deepestReserved.set(e0.getKey(), histCntr);
                 }
             }
+
+            // No one reservation matched for this partition.
+            if (!haveHistory.contains(p)) {
+                list.add(new SupplyPartitionInfo(
+                    p,
+                    nonMaxCntrs.last(),
+                    deepestReserved.get2(),
+                    deepestReserved.get1()
+                ));
+            }
         }
+
+        return list;
     }
 
     /**
@@ -4124,6 +4142,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param resetOwners True if reset partitions state needed, false otherwise.
      */
     private void assignPartitionsStates(boolean resetOwners) {
+        Map<String, List<SupplyPartitionInfo>> supplyInfoMap = log.isInfoEnabled() ?
+            new ConcurrentHashMap<>() : null;
+
         try {
             U.doInParallel(
                 cctx.kernalContext().getSystemExecutorService(),
@@ -4135,8 +4156,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         ? grpCtx.topology()
                         : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
 
-                    if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
-                        assignPartitionStates(top, resetOwners);
+                    if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration())) {
+                        List<SupplyPartitionInfo> list = assignPartitionStates(top, resetOwners);
+
+                        if (supplyInfoMap != null && !F.isEmpty(list))
+                            supplyInfoMap.put(grpDesc.cacheOrGroupName(), list);
+                    }
                     else if (resetOwners)
                         assignPartitionSizes(top);
 
@@ -4148,10 +4173,66 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             throw new IgniteException("Failed to assign partition states", e);
         }
 
+        if (!F.isEmpty(supplyInfoMap))
+            printPartitionRebalancingFully(supplyInfoMap);
+
         timeBag.finishGlobalStage("Assign partitions states");
     }
 
     /**
+     * Prints detail information about partitions which did not have reservation
+     * history enough for historical rebalance.
+     *
+     * @param supplyInfoMap Map contains information about supplying partitions.
+     */
+    private void printPartitionRebalancingFully(Map<String, List<SupplyPartitionInfo>> supplyInfoMap) {
+        try {
+            if (hasPartitionToLog(supplyInfoMap, false)) {
+                log.info("Partitions weren't present in any history reservation: [" +
+                    supplyInfoMap.entrySet().stream().map(entry ->
+                        "[grp=" + entry.getKey() + " part=[" + S.compact(entry.getValue().stream()
+                            .filter(info -> !info.isHistoryReserved())
+                            .map(info -> info.part()).collect(Collectors.toSet())) + "]]"
+                    ).collect(Collectors.joining(", ")) + ']');
+            }
+
+            if (hasPartitionToLog(supplyInfoMap, true)) {
+                log.info("Partitions were reserved, but maximum available counter is greater than demanded: [" +
+                    supplyInfoMap.entrySet().stream().map(entry ->
+                        "[grp=" + entry.getKey() + ' ' +
+                            entry.getValue().stream().filter(SupplyPartitionInfo::isHistoryReserved).map(info ->
+                                "[part=" + info.part() +
+                                    ", minCntr=" + info.minCntr() +
+                                    ", maxReserved=" + info.maxReserved() +
+                                    ", maxReservedNodeId=" + info.maxReservedNodeId() + ']'
+                            ).collect(Collectors.joining(", ")) + ']'
+                    ).collect(Collectors.joining(", ")) + ']');
+            }
+        }
+        catch (Exception e) {
+            log.error("An error happened during printing partitions that have no history.", e);
+        }
+    }
+
+    /**
+     * Does information contain partitions which will print to log.
+     *
+     * @param supplyInfoMap Map contains information about supplying partitions.
+     * @param reserved Reservation flag.
+     * @return True if map has partitions with same reserved flag, false otherwise.
+     */
+    private boolean hasPartitionToLog(Map<String, List<SupplyPartitionInfo>> supplyInfoMap, boolean reserved) {
+        for (List<SupplyPartitionInfo> infos : supplyInfoMap.values()) {
+            for (SupplyPartitionInfo info : infos) {
+                if (info.isHistoryReserved() == reserved)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * Removes gaps in the local update counters. Gaps in update counters are possible on backup node when primary
      * failed to send update counter deltas to backup.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java
new file mode 100644
index 0000000..11eca52
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/SupplyPartitionInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Information about supplier for specific partition.
+ */
+public class SupplyPartitionInfo {
+    /**
+     * Number of partiiton.
+     */
+    private int part;
+
+    /**
+     * Counter of partiotn from demander node.
+     */
+    private long minCntr;
+
+    /**
+     * Reservation.
+     */
+    private long maxReserved;
+
+    /**
+     * Node with the moust deepest history by partiton.
+     */
+    private UUID maxReservedNodeId;
+
+    /**
+     * @param part Partiiotn.
+     * @param minCntr Minimal counter.
+     * @param maxReserved Max reservation.
+     * @param maxReservedNodeId Node with maximum reservation.
+     */
+    public SupplyPartitionInfo(int part, long minCntr, long maxReserved, UUID maxReservedNodeId) {
+        this.part = part;
+        this.minCntr = minCntr;
+        this.maxReserved = maxReserved;
+        this.maxReservedNodeId = maxReservedNodeId;
+    }
+
+    /**
+     * @return Partition.
+     */
+    public int part() {
+        return part;
+    }
+
+    /**
+     * @return Minimum counter.
+     */
+    public long minCntr() {
+        return minCntr;
+    }
+
+    /**
+     * @return Max reservation.
+     */
+    public long maxReserved() {
+        return maxReserved;
+    }
+
+    /**
+     * @return Node id.
+     */
+    public UUID maxReservedNodeId() {
+        return maxReservedNodeId;
+    }
+
+    /**
+     * @return True if reserved, false otherwise.
+     */
+    public boolean isHistoryReserved() {
+        return maxReserved != Long.MAX_VALUE && maxReservedNodeId != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SupplyPartitionInfo.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4b01715..75af035 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -135,6 +135,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.PartitionDestroyQueue;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.PartitionDestroyRequest;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.ReservationReason;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -1737,7 +1738,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         Map</*grpId*/Integer, Set</*partId*/Integer>> applicableGroupsAndPartitions = partitionsApplicableForWalRebalance();
 
-        Map</*grpId*/Integer, Map</*partId*/Integer, CheckpointEntry>> earliestValidCheckpoints;
+        Map</*grpId*/Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> earliestValidCheckpoints;
 
         checkpointReadLock();
 
@@ -1750,10 +1751,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         Map</*grpId*/Integer, Map</*partId*/Integer, /*updCntr*/Long>> grpPartsWithCnts = new HashMap<>();
 
-        for (Map.Entry<Integer, Map<Integer, CheckpointEntry>> e : earliestValidCheckpoints.entrySet()) {
+        for (Map.Entry<Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> e : earliestValidCheckpoints.entrySet()) {
             int grpId = e.getKey();
 
-            for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().entrySet()) {
+            if (e.getValue().get2() == null)
+                continue;
+
+            for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().get2().entrySet()) {
                 CheckpointEntry cpEntry = e0.getValue();
 
                 int partId = e0.getKey();
@@ -1772,10 +1776,65 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
         }
 
+        if (log.isInfoEnabled() && !F.isEmpty(earliestValidCheckpoints))
+            printReservationToLog(earliestValidCheckpoints);
+
         return grpPartsWithCnts;
     }
 
     /**
+     * Prints detail information about caches which were not reserved
+     * and reservation depth for the caches which have WAL history enough.
+     *
+     * @param earliestValidCheckpoints Map contains information about caches' reservation.
+     */
+    private void printReservationToLog(
+        Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> earliestValidCheckpoints) {
+        try {
+            Map<ReservationReason, List<Integer>> notReservedCachesToPrint = new HashMap<>();
+            Map<ReservationReason, List<T2<Integer, CheckpointEntry>>> reservedCachesToPrint = new HashMap<>();
+
+            for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : earliestValidCheckpoints.entrySet()) {
+                if (entry.getValue().get2() == null) {
+                    notReservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>())
+                        .add(entry.getKey());
+                }
+                else {
+                    reservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>())
+                        .add(new T2(entry.getKey(), entry.getValue().get2().values().stream().min(
+                            Comparator.comparingLong(CheckpointEntry::timestamp)).get()));
+                }
+            }
+
+            if (!F.isEmpty(notReservedCachesToPrint)) {
+                log.info("Cache groups were not reserved [" +
+                    notReservedCachesToPrint.entrySet().stream()
+                        .map(entry -> '[' +
+                            entry.getValue().stream().map(grpId -> "[grpId=" + grpId +
+                                ", grpName=" + cctx.cache().cacheGroup(grpId).cacheOrGroupName() + ']')
+                                .collect(Collectors.joining(", ")) +
+                            ", reason=" + entry.getKey() + ']')
+                        .collect(Collectors.joining(", ")) + ']');
+            }
+
+            if (!F.isEmpty(reservedCachesToPrint)) {
+                log.info("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable: [" +
+                    reservedCachesToPrint.entrySet().stream()
+                        .map(entry -> '[' +
+                            entry.getValue().stream().map(grpCp -> "[grpId=" + grpCp.get1() +
+                                ", grpName=" + cctx.cache().cacheGroup(grpCp.get1()).cacheOrGroupName() +
+                                ", cp=(" + grpCp.get2().checkpointId() + ", " + U.format(grpCp.get2().timestamp()) + ")]")
+                                .collect(Collectors.joining(", ")) +
+                            ", reason=" + entry.getKey() + ']')
+                        .collect(Collectors.joining(", ")) + ']');
+            }
+        }
+        catch (Exception e) {
+            log.error("An error happened during printing partitions that were reserved for potential historical rebalance.", e);
+        }
+    }
+
+    /**
      * @return Map of group id -> Set of partitions which can be used as suppliers for WAL rebalance.
      */
     private Map<Integer, Set<Integer>> partitionsApplicableForWalRebalance() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 131e379..dfe3798 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -550,19 +550,22 @@ public class CheckpointHistory {
      *
      * @param groupsAndPartitions Groups and partitions to find and reserve earliest valid checkpoint.
      *
-     * @return Map (groupId: Map (partitionId, earliest valid checkpoint to history search)).
+     * @return Map (groupId, Reason (the reason why reservation cannot be made deeper): Map
+     * (partitionId, earliest valid checkpoint to history search)).
      */
-    public Map<Integer, Map<Integer, CheckpointEntry>> searchAndReserveCheckpoints(
+    public Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> searchAndReserveCheckpoints(
         final Map<Integer, Set<Integer>> groupsAndPartitions
     ) {
         if (F.isEmpty(groupsAndPartitions))
             return Collections.emptyMap();
 
-        final Map<Integer, Map<Integer, CheckpointEntry>> res = new HashMap<>();
+        final Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> res = new HashMap<>();
 
         CheckpointEntry oldestCpForReservation = null;
 
         synchronized (earliestCp) {
+            CheckpointEntry oldestHistoryCpEntry = firstCheckpoint();
+
             for (Integer grpId : groupsAndPartitions.keySet()) {
                 CheckpointEntry oldestGrpCpEntry = null;
 
@@ -579,17 +582,23 @@ public class CheckpointHistory {
                         oldestGrpCpEntry = cpEntry;
 
                     res.computeIfAbsent(grpId, partCpMap ->
-                        new HashMap<>())
-                        .put(part, cpEntry);
+                        new T2<>(ReservationReason.NO_MORE_HISTORY, new HashMap<>()))
+                        .get2().put(part, cpEntry);
                 }
+
+                if (oldestGrpCpEntry == null || oldestGrpCpEntry != oldestHistoryCpEntry)
+                    res.computeIfAbsent(grpId, (partCpMap) ->
+                        new T2<>(ReservationReason.CHECKPOINT_NOT_APPLICABLE, null))
+                        .set1(ReservationReason.CHECKPOINT_NOT_APPLICABLE);
             }
+        }
 
-            if (oldestCpForReservation != null) {
-                if (!cctx.wal().reserve(oldestCpForReservation.checkpointMark())) {
-                    log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark());
+        if (oldestCpForReservation != null) {
+            if (!cctx.wal().reserve(oldestCpForReservation.checkpointMark())) {
+                log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark());
 
-                    return Collections.emptyMap();
-                }
+                for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : res.entrySet())
+                    entry.setValue(new T2<>(ReservationReason.WAL_RESERVATION_ERROR, null));
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java
new file mode 100644
index 0000000..5f12e45
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/ReservationReason.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
+
+/**
+ * Represent a reason by that a WAL history was bounded.
+ */
+public enum ReservationReason {
+    /** The message puts down to log when an exception happened during
+     * reading a WAL segment or a segment cannot be reserved. */
+    WAL_RESERVATION_ERROR,
+
+    /** Reason means no more history reserved for the cache. */
+    NO_MORE_HISTORY,
+
+    /** Reason means a checkpoint in history reserved can not be applied for cache. */
+    CHECKPOINT_NOT_APPLICABLE;
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        switch (this) {
+            case WAL_RESERVATION_ERROR:
+                return "Unexpected error during processing of previous checkpoint";
+
+            case NO_MORE_HISTORY:
+                return "Reserved checkpoint is the oldest in history";
+
+            case CHECKPOINT_NOT_APPLICABLE:
+                return "Checkpoint was marked as inapplicable for historical rebalancing";
+
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
new file mode 100644
index 0000000..9b21b49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.internal.pagemem.wal.record.RolloverType.CURRENT_SEGMENT;
+
+/**
+ * Tests for checking historical rebalance log messages.
+ */
+public class IgniteWalRebalanceLoggingTest extends GridCommonAbstractTest {
+    /** This timeout should be big enough in order to prohibit checkpoint triggered by timeout. */
+    private static final int CHECKPOINT_FREQUENCY = 600_000;
+
+    /** Test logger. */
+    private final ListeningTestLogger srvLog = new ListeningTestLogger(false, log);
+
+    /** */
+    private static final int KEYS_LOW_BORDER = 100;
+
+    /** */
+    private static final int KEYS_UPPER_BORDER = 200;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setGridLogger(srvLog);
+
+        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
+
+        storageCfg.getDefaultDataRegionConfiguration()
+            .setMaxSize(200L * 1024 * 1024)
+            .setPersistenceEnabled(true);
+
+        storageCfg.setWalMode(WALMode.LOG_ONLY)
+            .setMaxWalArchiveSize(-1)
+            .setWalCompactionEnabled(true)
+            .setWalCompactionLevel(1);
+
+        storageCfg.setCheckpointFrequency(CHECKPOINT_FREQUENCY);
+
+        cfg.setDataStorageConfiguration(storageCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc}*/
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc}*/
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Check that in case of Historical rebalance we log appropriate messages.
+     * <p>
+     *     <b>Steps:</b>
+     *     <ol>
+     *         <li>set IGNITE_PDS_WAL_REBALANCE_THRESHOLD to 1</li>
+     *         <li>Start two nodes.</li>
+     *         <li>Create two caches each in it's own cache group and populate them with some data.</li>
+     *         <li>Stop second node and add more data to both caches.</li>
+     *         <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be
+     *         created.</li>
+     *         <li>Start, previously stopped node and await for PME.</li>
+     *     </ol>
+     * <p>
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1")
+    public void testHistoricalRebalanceLogMsg() throws Exception {
+        LogListener expMsgsLsnr = LogListener.matches(str ->
+            str.startsWith("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable:") &&
+                str.contains("cache_group1") && str.contains("cache_group2")).times(3).
+            andMatches(str -> str.startsWith("Starting rebalance routine") &&
+                (str.contains("cache_group1") || str.contains("cache_group2")) &&
+                str.contains("fullPartitions=[], histPartitions=[0-7]")).times(2).build();
+
+        LogListener unexpectedMessagesLsnr = LogListener.matches((str) ->
+            str.startsWith("Partitions weren't present in any history reservation:") ||
+                str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded:")
+        ).build();
+
+        checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr, unexpectedMessagesLsnr);
+
+        assertTrue(expMsgsLsnr.check());
+        assertFalse(unexpectedMessagesLsnr.check());
+    }
+
+    /**
+     * Check that in case of Full rebalance we log appropriate messages.
+     * <p>
+     *     <b>Steps:</b>
+     *     <ol>
+     *         <li>restore IGNITE_PDS_WAL_REBALANCE_THRESHOLD to default 500000</li>
+     *         <li>Start two nodes.</li>
+     *         <li>Create two caches each in it's own cache group and populate them with some data.</li>
+     *         <li>Stop second node and add more data to both caches.</li>
+     *         <li>Wait checkpoint frequency * 2. This is required to guarantee that at least one checkpoint would be
+     *         created.</li>
+     *         <li>Start, previously stopped node and await for PME.</li>
+     *     </ol>
+     * <p>
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "500000")
+    public void testFullRebalanceLogMsgs() throws Exception {
+        LogListener expMsgsLsnr = LogListener.
+            matches("Partitions weren't present in any history reservation: " +
+                "[[grp=cache_group2 part=[[0-7]]], [grp=cache_group1 part=[[0-7]]]]").
+            andMatches(str -> str.startsWith("Starting rebalance routine") &&
+                (str.contains("cache_group1") || str.contains("cache_group2")) &&
+                str.contains("fullPartitions=[0-7], histPartitions=[]")).times(2).build();
+
+        checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr);
+
+        assertTrue(expMsgsLsnr.check());
+    }
+
+    /**
+     * Test checks log messages in case of short history of checkpoint.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2")
+    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1")
+    public void testFullRebalanceWithShortCpHistoryLogMsgs() throws Exception {
+        LogListener expMsgsLsnr = LogListener.
+            matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded: ") &&
+                str.contains("grp=cache_group1") && str.contains("grp=cache_group2")).
+            andMatches(str -> str.startsWith("Starting rebalance routine") &&
+                (str.contains("cache_group1") || str.contains("cache_group2")) &&
+                str.contains("fullPartitions=[0-7], histPartitions=[]")).times(2).build();
+
+        checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(expMsgsLsnr);
+
+        assertTrue(expMsgsLsnr.check());
+    }
+
+    /**
+     * Test utility method.
+     *
+     * @param lsnrs Listeners to register with server logger.
+     * @throws Exception If failed.
+     */
+    private void checkFollowingPartitionsWereReservedForPotentialHistoryRebalanceMsg(LogListener... lsnrs)
+        throws Exception {
+        startGridsMultiThreaded(2).cluster().active(true);
+
+        IgniteCache<Integer, String> cache1 = createCache("cache1", "cache_group1");
+        IgniteCache<Integer, String> cache2 = createCache("cache2", "cache_group2");
+
+        for (int i = 0; i < KEYS_LOW_BORDER; i++) {
+            cache1.put(i, "abc" + i);
+            cache2.put(i, "abc" + i);
+
+            if (i % 20 == 0)
+                forceCheckpointAndRollOwerWal();
+        }
+
+        stopGrid(1);
+
+        for (int i = KEYS_LOW_BORDER; i < KEYS_UPPER_BORDER; i++) {
+            cache1.put(i, "abc" + i);
+            cache2.put(i, "abc" + i);
+
+            if (i % 20 == 0)
+                forceCheckpointAndRollOwerWal();
+        }
+
+        srvLog.clearListeners();
+
+        for (LogListener lsnr: lsnrs)
+            srvLog.registerListener(lsnr);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange(false, true, null);
+    }
+
+    /**
+     * Create cache with specific name and group name.
+     * @param cacheName Cache name.
+     * @param cacheGrpName Cache group name.
+     * @return Created cache.
+     */
+    private IgniteCache<Integer, String> createCache(String cacheName, String cacheGrpName) {
+        return ignite(0).createCache(
+            new CacheConfiguration<Integer, String>(cacheName).
+                setAffinity(new RendezvousAffinityFunction().setPartitions(8))
+                .setGroupName(cacheGrpName).
+                setBackups(1));
+    }
+
+    /**
+     * Invokes checkpoint forcibly and rollovers WAL segment.
+     * It might be need for simulate long checkpoint history in test.
+     *
+     * @throws Exception If failed.
+     */
+    private void forceCheckpointAndRollOwerWal() throws Exception {
+        forceCheckpoint();
+
+        for (Ignite ignite : G.allGrids()) {
+            if (ignite.cluster().localNode().isClient())
+                continue;
+
+            IgniteEx ig = (IgniteEx)ignite;
+
+            IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal();
+
+            ig.context().cache().context().database().checkpointReadLock();
+
+            try {
+                WALPointer ptr = walMgr.log(new AdHocWALRecord(), CURRENT_SEGMENT);
+            }
+            finally {
+                ig.context().cache().context().database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /** Tets WAL record. */
+    private static class AdHocWALRecord extends CheckpointRecord {
+        /** Default constructor. */
+        private AdHocWALRecord() {
+            super(null);
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
index 38f93c6..a17e889 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALT
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFormatFileFailoverTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
@@ -93,6 +94,7 @@ public class IgnitePdsMvccTestSuite2 {
         ignoredTests.add(StandaloneWalRecordsIteratorTest.class);
         ignoredTests.add(IgniteWALTailIsReachedDuringIterationOverArchiveTest.class);
         ignoredTests.add(WalRolloverTypesTest.class);
+        ignoredTests.add(IgniteWalRebalanceLoggingTest.class);
         ignoredTests.add(FsyncWalRolloverDoesNotBlockTest.class);
 
         return IgnitePdsTestSuite2.suite(ignoredTests);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index b5a314f..2f065c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalReplayingAfterRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
@@ -259,5 +260,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests);
+
+        GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests);
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java
index 2e63fdb..57021aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/client/IgniteDataStreamerTest.java
@@ -46,6 +46,8 @@ public class IgniteDataStreamerTest extends GridCommonAbstractTest {
 
         startGrids(2);
         startClientGrid("client");
+
+        awaitPartitionMapExchange();
     }
 
     @Override protected void afterTest() throws Exception {