You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/08/06 22:12:14 UTC

[ignite] branch master updated: IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c325627  IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285)
c325627 is described below

commit c32562791f777aa991634caa708845e8cc6d3e80
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Aug 7 01:11:44 2021 +0300

    IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285)
---
 .../cache/GridCachePartitionExchangeManager.java   | 205 ++++++---------------
 .../processors/cache/GridCachePreloader.java       |  26 +--
 .../cache/GridCachePreloaderAdapter.java           |  24 +--
 .../dht/preloader/GridDhtPreloader.java            |  24 +--
 .../cache/CacheGroupsMetricsRebalanceTest.java     |  36 +---
 .../TxPartitionCounterStateConsistencyTest.java    |   2 -
 6 files changed, 82 insertions(+), 235 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2d5626a..c4662a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -22,16 +22,17 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
@@ -102,7 +104,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
@@ -112,7 +113,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.lat
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -305,9 +305,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Histogram of blocking PME durations. */
     private volatile HistogramMetricImpl blockingDurationHistogram;
 
-    /** Delay before rebalancing code is start executing after exchange completion. For tests only. */
-    private volatile long rebalanceDelay;
-
     /** Metric that shows whether cluster is in fully rebalanced state. */
     private volatile BooleanMetricImpl rebalanced;
 
@@ -2648,13 +2645,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param delay Rebalance delay.
-     */
-    public void rebalanceDelay(long delay) {
-        this.rebalanceDelay = delay;
-    }
-
-    /**
      * For testing only.
      *
      * @return Current version to wait for.
@@ -3289,10 +3279,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         continue;
                     }
 
-                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
-                    boolean forcePreload = false;
-
                     GridDhtPartitionExchangeId exchId;
 
                     GridDhtPartitionsExchangeFuture exchFut = null;
@@ -3335,8 +3321,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                          }
                     }
                     else if (task instanceof ForceRebalanceExchangeTask) {
-                        forcePreload = true;
-
                         timeout = 0; // Force refresh.
 
                         exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
@@ -3485,84 +3469,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     if (rebalanceRequired(exchFut)) {
-                        if (rebalanceDelay > 0)
-                            U.sleep(rebalanceDelay);
+                        NavigableSet<CacheGroupContext> assignsSet = cctx.cache().cacheGroups().stream()
+                            .collect(Collectors.toCollection(() -> new TreeSet<>(new CacheRebalanceOrderComparator())));
 
-                        assignsMap = new HashMap<>();
-
-                        IgniteCacheSnapshotManager snp = cctx.snapshot();
-
-                        for (final CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                            long delay = grp.config().getRebalanceDelay();
-
-                            boolean disableRebalance = snp.partitionsAreFrozen(grp);
-
-                            GridDhtPreloaderAssignments assigns = null;
-
-                            // Don't delay for dummy reassigns to avoid infinite recursion.
-                            if ((delay == 0 || forcePreload) && !disableRebalance)
-                                assigns = grp.preloader().generateAssignments(exchId, exchFut);
-
-                            assignsMap.put(grp.groupId(), assigns);
-
-                            if (resVer == null && !grp.isLocal())
-                                resVer = grp.topology().readyTopologyVersion();
-                        }
-                    }
-
-                    if (resVer == null)
-                        resVer = exchId.topologyVersion();
-
-                    if (!F.isEmpty(assignsMap)) {
-                        int size = assignsMap.size();
-
-                        NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>();
-
-                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
-                            int grpId = e.getKey();
-
-                            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
-                            CacheRebalanceOrder order = new CacheRebalanceOrder(
-                                grp.config().getRebalanceOrder(),
-                                grp.config().getRebalanceMode());
-
-                            if (orderMap.get(order) == null)
-                                orderMap.put(order, new ArrayList<Integer>(size));
-
-                            orderMap.get(order).add(grpId);
-                        }
-
-                        RebalanceFuture r = null;
+                        RebalanceFuture next = null;
 
                         GridCompoundFuture<Boolean, Boolean> rebFut = new GridCompoundFuture<>();
 
-                        ArrayList<String> rebList = new ArrayList<>(size);
-
                         GridCompoundFuture<Boolean, Boolean> forcedRebFut = null;
 
                         if (task instanceof ForceRebalanceExchangeTask)
                             forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                        for (CacheRebalanceOrder order : orderMap.descendingKeySet()) {
-                            for (Integer grpId : orderMap.get(order)) {
-                                CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+                        for (CacheGroupContext grp : assignsSet.descendingSet()) {
+                            boolean disableRebalance = cctx.snapshot().partitionsAreFrozen(grp);
 
-                                GridDhtPreloaderAssignments assigns = assignsMap.get(grpId);
-
-                                RebalanceFuture cur = grp.preloader().addAssignments(assigns,
-                                    forcePreload,
-                                    cnt,
-                                    r,
-                                    forcedRebFut,
-                                    rebFut);
+                            if (disableRebalance)
+                                continue;
 
-                                if (cur != null) {
-                                    rebList.add(grp.cacheOrGroupName());
+                            RebalanceFuture cur = grp.preloader().prepare(exchId,
+                                exchFut,
+                                cnt,
+                                next,
+                                forcedRebFut,
+                                rebFut);
 
-                                    r = cur;
-                                }
-                            }
+                            if (cur != null)
+                                next = cur;
                         }
 
                         rebFut.markInitialized();
@@ -3570,15 +3503,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         if (forcedRebFut != null)
                             forcedRebFut.markInitialized();
 
-                        if (r != null) {
-                            Collections.reverse(rebList);
-
-                            RebalanceFuture finalR = r;
+                        if (next != null) {
+                            RebalanceFuture finalR = next;
 
                             // Waits until compatible rebalances are finished.
                             // Start rebalancing cache groups chain. Each group will be rebalanced
                             // sequentially one by one e.g.:
                             // ignite-sys-cache -> cacheGroupR1 -> cacheGroupP2 -> cacheGroupR3
+                            List<String> rebList = assignsSet.stream().map(CacheGroupContext::cacheOrGroupName)
+                                .collect(Collectors.toList());
+
                             long rebId = cnt;
 
                             rebFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@@ -3594,6 +3528,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             });
                         }
                         else {
+                            resVer = resVer == null ? assignsSet.stream()
+                                .filter(g -> !g.isLocal())
+                                .map(g -> g.topology().readyTopologyVersion())
+                                .filter(Objects::nonNull)
+                                .findFirst()
+                                .orElse(exchId.topologyVersion()) : resVer;
+
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
                                 "[top=" + resVer + ", force=" + (exchFut == null) +
                                 ", evt=" + exchId.discoveryEventName() +
@@ -3602,7 +3543,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
                     else {
                         U.log(log, "Skipping rebalancing (no affinity changes) " +
-                            "[top=" + resVer +
+                            "[top=" + resVer == null ? exchId.topologyVersion() : resVer +
                             ", evt=" + exchId.discoveryEventName() +
                             ", evtNode=" + exchId.nodeId() +
                             ", client=" + cctx.kernalContext().clientNode() + ']');
@@ -3638,10 +3579,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         /**
          * Rebalance is not required on a client node and is always required when the exchange future is null.
-         * In other cases, this method checks all caches and decides whether rebalancing is required or not
-         * for the specific exchange.
          *
-         * @param exchFut Exchange future.
+         * @param exchFut Exchange future or {@code null} if it is force rebalance task.
          * @return {@code True} if rebalance is required at least for one of cache groups.
          */
         private boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
@@ -3651,15 +3590,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (exchFut == null)
                 return true;
 
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
-                    continue;
-
-                if (grp.preloader().rebalanceRequired(exchFut))
-                    return true;
-            }
-
-            return false;
+            return lastAffinityChangedTopologyVersion(exchFut.topologyVersion()).equals(exchFut.topologyVersion());
         }
     }
 
@@ -4028,61 +3959,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * Represents a cache rebalance order that takes into account both values: rebalance order itself and rebalance mode.
      * It is assumed SYNC caches should be rebalanced in the first place.
      */
-    private static class CacheRebalanceOrder implements Comparable<CacheRebalanceOrder> {
-        /** Cache rebalance order. */
-        private int order;
-
-        /** Cache rebalance mode. */
-        private CacheRebalanceMode mode;
-
-        /**
-         * Creates a new instance of CacheRebalanceOrder.
-         *
-         * @param order Cache rebalance order.
-         * @param mode Cache rebalance mode.
-         */
-        public CacheRebalanceOrder(int order, CacheRebalanceMode mode) {
-            this.order = order;
-            this.mode = mode;
-        }
-
+    private static class CacheRebalanceOrderComparator implements Comparator<CacheGroupContext> {
         /** {@inheritDoc} */
-        @Override public int compareTo(@NotNull CacheRebalanceOrder o) {
-            if (order == o.order) {
-                if (mode == o.mode)
-                    return 0;
-
-                switch (mode) {
-                    case SYNC: return -1;
-                    case ASYNC: return o.mode == CacheRebalanceMode.SYNC ? 1 : -1;
-                    case NONE: return 1;
+        @Override public int compare(CacheGroupContext ctx1, CacheGroupContext ctx2) {
+            CacheConfiguration<?, ?> cfg1 = ctx1.config();
+            CacheConfiguration<?, ?> cfg2 = ctx2.config();
+
+            if (cfg1.getRebalanceOrder() == cfg2.getRebalanceOrder()) {
+                if (cfg1.getRebalanceMode() == cfg2.getRebalanceMode())
+                    return ctx1.cacheOrGroupName().compareTo(ctx2.cacheOrGroupName());
+
+                switch (cfg1.getRebalanceMode()) {
+                    case SYNC:
+                        return -1;
+                    case ASYNC:
+                        return cfg2.getRebalanceMode() == CacheRebalanceMode.SYNC ? 1 : -1;
+                    case NONE:
+                        return 1;
                     default:
-                        throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + mode + ']');
+                        throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + cfg1.getRebalanceMode() + ']');
                 }
             }
             else
-                return (order < o.order) ? -1 : 1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            CacheRebalanceOrder order1 = (CacheRebalanceOrder)o;
-
-            if (order != order1.order)
-                return false;
-            return mode == order1.mode;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int result = order;
-            result = 31 * result + mode.hashCode();
-            return result;
+                return (cfg1.getRebalanceOrder() < cfg2.getRebalanceOrder()) ? -1 : 1;
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 4da69ca..82bf6d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -26,13 +26,11 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.jetbrains.annotations.Nullable;
 
@@ -66,33 +64,17 @@ public interface GridCachePreloader {
     public void onInitialExchangeComplete(@Nullable Throwable err);
 
     /**
-     * @param exchFut Completed exchange future.
-     * @return {@code True} if rebalance should be started (previous will be interrupted).
-     */
-    public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut);
-
-    /**
      * @param exchId Exchange ID.
      * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
-     * @return Partition assignments which will be requested from supplier nodes.
-     */
-    @Nullable public GridDhtPreloaderAssignments generateAssignments(
-        GridDhtPartitionExchangeId exchId,
-        @Nullable GridDhtPartitionsExchangeFuture exchFut);
-
-    /**
-     * Adds assignments to preloader.
-     *
-     * @param assignments Assignments to add.
-     * @param forcePreload {@code True} if preload requested by {@link ForceRebalanceExchangeTask}.
      * @param rebalanceId Rebalance id created by exchange thread.
-     * @param next Rebalance's future follows after the current one.
+     * @param next Rebalance future follows after the current one.
      * @param forcedRebFut External future for forced rebalance.
      * @param compatibleRebFut Future for waiting for compatible rebalances.
      * @return Future if rebalance was planned or null.
      */
-    public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
-        boolean forcePreload,
+    public RebalanceFuture prepare(
+        GridDhtPartitionExchangeId exchId,
+        @Nullable GridDhtPartitionsExchangeFuture exchFut,
         long rebalanceId,
         final RebalanceFuture next,
         @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 31d209b..2078204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -138,25 +138,27 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments generateAssignments(
+    /**
+     * @param exchId Exchange ID.
+     * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
+     * @return Partition assignments which will be requested from supplier nodes.
+     */
+    public GridDhtPreloaderAssignments generateAssignments(
         GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut) {
+        GridDhtPartitionsExchangeFuture exchFut
+    ) {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
-        boolean forcePreload,
+    @Override public RebalanceFuture prepare(
+        GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionsExchangeFuture exchFut,
         long rebalanceId,
         RebalanceFuture next,
         @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
-        GridCompoundFuture<Boolean, Boolean> compatibleRebFut) {
+        GridCompoundFuture<Boolean, Boolean> compatibleRebFut
+    ) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8c3b446..21f2652 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -159,17 +159,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
-        if (ctx.kernalContext().clientNode())
-            return false; // No-op.
-
-        AffinityTopologyVersion lastAffChangeTopVer =
-            ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion());
-
-        return lastAffChangeTopVer.equals(exchFut.topologyVersion());
-    }
-
-    /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments generateAssignments(
         GridDhtPartitionExchangeId exchId,
         GridDhtPartitionsExchangeFuture exchFut
@@ -378,15 +367,20 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public RebalanceFuture addAssignments(
-        GridDhtPreloaderAssignments assignments,
-        boolean forceRebalance,
+    @Override public RebalanceFuture prepare(
+        GridDhtPartitionExchangeId exchId,
+        @Nullable GridDhtPartitionsExchangeFuture exchFut,
         long rebalanceId,
         final RebalanceFuture next,
         @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
         GridCompoundFuture<Boolean, Boolean> compatibleRebFut
     ) {
-        return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut, compatibleRebFut);
+        long delay = grp.config().getRebalanceDelay();
+        boolean forceRebalance = forcedRebFut != null;
+
+        // Don't delay for dummy reassigns to avoid infinite recursion.
+        return (delay == 0 || forceRebalance) ? demander.addAssignments(generateAssignments(exchId, exchFut), forceRebalance,
+            rebalanceId, next, forcedRebFut, compatibleRebFut) : null;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index 4635830..b1fe03e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
-import com.google.common.collect.Lists;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
@@ -48,7 +47,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -194,37 +192,13 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
                 cache2.put(i, CACHE2 + "-" + i);
         }
 
-        final CountDownLatch startStopRebalanceLatch = new CountDownLatch(1);
-        final CountDownLatch finishStopRebalanceLatch = new CountDownLatch(1);
-        GridFutureAdapter<Void> stopRebalanceResFut = new GridFutureAdapter<>();
+        ignite = startGrid(1);
 
-        ignite = startGrid(1, cfg -> {
-            cfg.setLocalEventListeners(Collections.singletonMap(
-                (IgnitePredicate<Event>)evt -> {
-                    startStopRebalanceLatch.countDown();
-
-                    try {
-                        assertTrue(finishStopRebalanceLatch.await(getTestTimeout(), TimeUnit.SECONDS));
-
-                        stopRebalanceResFut.onDone();
-                    }
-                    catch (Throwable e) {
-                        stopRebalanceResFut.onDone(e);
-                    }
-
-                    return false;
-                },
-                new int[] {EventType.EVT_CACHE_REBALANCE_STOPPED}
-            ));
-        });
-
-        assertTrue(startStopRebalanceLatch.await(getTestTimeout(), TimeUnit.SECONDS));
+        awaitPartitionMapExchange(true, true, null, true);
 
         CacheMetrics metrics1 = ignite.cache(CACHE1).localMetrics();
         CacheMetrics metrics2 = ignite.cache(CACHE2).localMetrics();
 
-        finishStopRebalanceLatch.countDown();
-
         long rate1 = metrics1.getRebalancingKeysRate();
         long rate2 = metrics2.getRebalancingKeysRate();
 
@@ -234,8 +208,6 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
 
         assertEquals(metrics1.getRebalancedKeys(), rate1);
         assertEquals(metrics2.getRebalancedKeys(), rate2);
-
-        stopRebalanceResFut.get(getTestTimeout());
     }
 
     /**
@@ -245,7 +217,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
     public void testCacheGroupRebalance() throws Exception {
         IgniteEx ignite0 = startGrid(0);
 
-        List<String> cacheNames = Lists.newArrayList(CACHE4, CACHE5);
+        List<String> cacheNames = Arrays.asList(CACHE4, CACHE5);
 
         int allKeysCount = 0;
 
@@ -370,7 +342,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
 
         IgniteEx ignite0 = startGrid(0);
 
-        List<String> cacheNames = Lists.newArrayList(CACHE4, CACHE5);
+        List<String> cacheNames = Arrays.asList(CACHE4, CACHE5);
 
         for (String cacheName : cacheNames) {
             ignite0.getOrCreateCache(cacheName).putAll(new Random().ints(KEYS_COUNT).distinct().boxed()
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index e6f3bbaf..5c78e3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -1185,8 +1185,6 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
             grid(0).resetLostPartitions(Collections.singleton(DEFAULT_CACHE_NAME));
         }
 
-        prim.context().cache().context().exchange().rebalanceDelay(500);
-
         Random r = new Random();
 
         AtomicBoolean stop = new AtomicBoolean();