You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/08/06 22:12:14 UTC
[ignite] branch master updated: IGNITE-15208 Remove unnecessary
rebalance order classes from exchange thread (#9285)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c325627 IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285)
c325627 is described below
commit c32562791f777aa991634caa708845e8cc6d3e80
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Aug 7 01:11:44 2021 +0300
IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285)
---
.../cache/GridCachePartitionExchangeManager.java | 205 ++++++---------------
.../processors/cache/GridCachePreloader.java | 26 +--
.../cache/GridCachePreloaderAdapter.java | 24 +--
.../dht/preloader/GridDhtPreloader.java | 24 +--
.../cache/CacheGroupsMetricsRebalanceTest.java | 36 +---
.../TxPartitionCounterStateConsistencyTest.java | 2 -
6 files changed, 82 insertions(+), 235 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2d5626a..c4662a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -22,16 +22,17 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
@@ -102,7 +104,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
@@ -112,7 +113,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.lat
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -305,9 +305,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Histogram of blocking PME durations. */
private volatile HistogramMetricImpl blockingDurationHistogram;
- /** Delay before rebalancing code is start executing after exchange completion. For tests only. */
- private volatile long rebalanceDelay;
-
/** Metric that shows whether cluster is in fully rebalanced state. */
private volatile BooleanMetricImpl rebalanced;
@@ -2648,13 +2645,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param delay Rebalance delay.
- */
- public void rebalanceDelay(long delay) {
- this.rebalanceDelay = delay;
- }
-
- /**
* For testing only.
*
* @return Current version to wait for.
@@ -3289,10 +3279,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
continue;
}
- Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
- boolean forcePreload = false;
-
GridDhtPartitionExchangeId exchId;
GridDhtPartitionsExchangeFuture exchFut = null;
@@ -3335,8 +3321,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
else if (task instanceof ForceRebalanceExchangeTask) {
- forcePreload = true;
-
timeout = 0; // Force refresh.
exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
@@ -3485,84 +3469,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (rebalanceRequired(exchFut)) {
- if (rebalanceDelay > 0)
- U.sleep(rebalanceDelay);
+ NavigableSet<CacheGroupContext> assignsSet = cctx.cache().cacheGroups().stream()
+ .collect(Collectors.toCollection(() -> new TreeSet<>(new CacheRebalanceOrderComparator())));
- assignsMap = new HashMap<>();
-
- IgniteCacheSnapshotManager snp = cctx.snapshot();
-
- for (final CacheGroupContext grp : cctx.cache().cacheGroups()) {
- long delay = grp.config().getRebalanceDelay();
-
- boolean disableRebalance = snp.partitionsAreFrozen(grp);
-
- GridDhtPreloaderAssignments assigns = null;
-
- // Don't delay for dummy reassigns to avoid infinite recursion.
- if ((delay == 0 || forcePreload) && !disableRebalance)
- assigns = grp.preloader().generateAssignments(exchId, exchFut);
-
- assignsMap.put(grp.groupId(), assigns);
-
- if (resVer == null && !grp.isLocal())
- resVer = grp.topology().readyTopologyVersion();
- }
- }
-
- if (resVer == null)
- resVer = exchId.topologyVersion();
-
- if (!F.isEmpty(assignsMap)) {
- int size = assignsMap.size();
-
- NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>();
-
- for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
- int grpId = e.getKey();
-
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
- CacheRebalanceOrder order = new CacheRebalanceOrder(
- grp.config().getRebalanceOrder(),
- grp.config().getRebalanceMode());
-
- if (orderMap.get(order) == null)
- orderMap.put(order, new ArrayList<Integer>(size));
-
- orderMap.get(order).add(grpId);
- }
-
- RebalanceFuture r = null;
+ RebalanceFuture next = null;
GridCompoundFuture<Boolean, Boolean> rebFut = new GridCompoundFuture<>();
- ArrayList<String> rebList = new ArrayList<>(size);
-
GridCompoundFuture<Boolean, Boolean> forcedRebFut = null;
if (task instanceof ForceRebalanceExchangeTask)
forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
- for (CacheRebalanceOrder order : orderMap.descendingKeySet()) {
- for (Integer grpId : orderMap.get(order)) {
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+ for (CacheGroupContext grp : assignsSet.descendingSet()) {
+ boolean disableRebalance = cctx.snapshot().partitionsAreFrozen(grp);
- GridDhtPreloaderAssignments assigns = assignsMap.get(grpId);
-
- RebalanceFuture cur = grp.preloader().addAssignments(assigns,
- forcePreload,
- cnt,
- r,
- forcedRebFut,
- rebFut);
+ if (disableRebalance)
+ continue;
- if (cur != null) {
- rebList.add(grp.cacheOrGroupName());
+ RebalanceFuture cur = grp.preloader().prepare(exchId,
+ exchFut,
+ cnt,
+ next,
+ forcedRebFut,
+ rebFut);
- r = cur;
- }
- }
+ if (cur != null)
+ next = cur;
}
rebFut.markInitialized();
@@ -3570,15 +3503,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (forcedRebFut != null)
forcedRebFut.markInitialized();
- if (r != null) {
- Collections.reverse(rebList);
-
- RebalanceFuture finalR = r;
+ if (next != null) {
+ RebalanceFuture finalR = next;
// Waits until compatible rebalances are finished.
// Start rebalancing cache groups chain. Each group will be rebalanced
// sequentially one by one e.g.:
// ignite-sys-cache -> cacheGroupR1 -> cacheGroupP2 -> cacheGroupR3
+ List<String> rebList = assignsSet.stream().map(CacheGroupContext::cacheOrGroupName)
+ .collect(Collectors.toList());
+
long rebId = cnt;
rebFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@@ -3594,6 +3528,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
});
}
else {
+ resVer = resVer == null ? assignsSet.stream()
+ .filter(g -> !g.isLocal())
+ .map(g -> g.topology().readyTopologyVersion())
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElse(exchId.topologyVersion()) : resVer;
+
U.log(log, "Skipping rebalancing (nothing scheduled) " +
"[top=" + resVer + ", force=" + (exchFut == null) +
", evt=" + exchId.discoveryEventName() +
@@ -3602,7 +3543,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
else {
U.log(log, "Skipping rebalancing (no affinity changes) " +
- "[top=" + resVer +
+ "[top=" + resVer == null ? exchId.topologyVersion() : resVer +
", evt=" + exchId.discoveryEventName() +
", evtNode=" + exchId.nodeId() +
", client=" + cctx.kernalContext().clientNode() + ']');
@@ -3638,10 +3579,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* Rebalance is not required on a client node and is always required when the exchange future is null.
- * In other cases, this method checks all caches and decides whether rebalancing is required or not
- * for the specific exchange.
*
- * @param exchFut Exchange future.
+ * @param exchFut Exchange future or {@code null} if it is force rebalance task.
* @return {@code True} if rebalance is required at least for one of cache groups.
*/
private boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
@@ -3651,15 +3590,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (exchFut == null)
return true;
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
-
- if (grp.preloader().rebalanceRequired(exchFut))
- return true;
- }
-
- return false;
+ return lastAffinityChangedTopologyVersion(exchFut.topologyVersion()).equals(exchFut.topologyVersion());
}
}
@@ -4028,61 +3959,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* Represents a cache rebalance order that takes into account both values: rebalance order itself and rebalance mode.
* It is assumed SYNC caches should be rebalanced in the first place.
*/
- private static class CacheRebalanceOrder implements Comparable<CacheRebalanceOrder> {
- /** Cache rebalance order. */
- private int order;
-
- /** Cache rebalance mode. */
- private CacheRebalanceMode mode;
-
- /**
- * Creates a new instance of CacheRebalanceOrder.
- *
- * @param order Cache rebalance order.
- * @param mode Cache rebalance mode.
- */
- public CacheRebalanceOrder(int order, CacheRebalanceMode mode) {
- this.order = order;
- this.mode = mode;
- }
-
+ private static class CacheRebalanceOrderComparator implements Comparator<CacheGroupContext> {
/** {@inheritDoc} */
- @Override public int compareTo(@NotNull CacheRebalanceOrder o) {
- if (order == o.order) {
- if (mode == o.mode)
- return 0;
-
- switch (mode) {
- case SYNC: return -1;
- case ASYNC: return o.mode == CacheRebalanceMode.SYNC ? 1 : -1;
- case NONE: return 1;
+ @Override public int compare(CacheGroupContext ctx1, CacheGroupContext ctx2) {
+ CacheConfiguration<?, ?> cfg1 = ctx1.config();
+ CacheConfiguration<?, ?> cfg2 = ctx2.config();
+
+ if (cfg1.getRebalanceOrder() == cfg2.getRebalanceOrder()) {
+ if (cfg1.getRebalanceMode() == cfg2.getRebalanceMode())
+ return ctx1.cacheOrGroupName().compareTo(ctx2.cacheOrGroupName());
+
+ switch (cfg1.getRebalanceMode()) {
+ case SYNC:
+ return -1;
+ case ASYNC:
+ return cfg2.getRebalanceMode() == CacheRebalanceMode.SYNC ? 1 : -1;
+ case NONE:
+ return 1;
default:
- throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + mode + ']');
+ throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + cfg1.getRebalanceMode() + ']');
}
}
else
- return (order < o.order) ? -1 : 1;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- CacheRebalanceOrder order1 = (CacheRebalanceOrder)o;
-
- if (order != order1.order)
- return false;
- return mode == order1.mode;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = order;
- result = 31 * result + mode.hashCode();
- return result;
+ return (cfg1.getRebalanceOrder() < cfg2.getRebalanceOrder()) ? -1 : 1;
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 4da69ca..82bf6d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -26,13 +26,11 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.jetbrains.annotations.Nullable;
@@ -66,33 +64,17 @@ public interface GridCachePreloader {
public void onInitialExchangeComplete(@Nullable Throwable err);
/**
- * @param exchFut Completed exchange future.
- * @return {@code True} if rebalance should be started (previous will be interrupted).
- */
- public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut);
-
- /**
* @param exchId Exchange ID.
* @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
- * @return Partition assignments which will be requested from supplier nodes.
- */
- @Nullable public GridDhtPreloaderAssignments generateAssignments(
- GridDhtPartitionExchangeId exchId,
- @Nullable GridDhtPartitionsExchangeFuture exchFut);
-
- /**
- * Adds assignments to preloader.
- *
- * @param assignments Assignments to add.
- * @param forcePreload {@code True} if preload requested by {@link ForceRebalanceExchangeTask}.
* @param rebalanceId Rebalance id created by exchange thread.
- * @param next Rebalance's future follows after the current one.
+ * @param next Rebalance future follows after the current one.
* @param forcedRebFut External future for forced rebalance.
* @param compatibleRebFut Future for waiting for compatible rebalances.
* @return Future if rebalance was planned or null.
*/
- public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
- boolean forcePreload,
+ public RebalanceFuture prepare(
+ GridDhtPartitionExchangeId exchId,
+ @Nullable GridDhtPartitionsExchangeFuture exchFut,
long rebalanceId,
final RebalanceFuture next,
@Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 31d209b..2078204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -138,25 +138,27 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
// No-op.
}
- /** {@inheritDoc} */
- @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments generateAssignments(
+ /**
+ * @param exchId Exchange ID.
+ * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
+ * @return Partition assignments which will be requested from supplier nodes.
+ */
+ public GridDhtPreloaderAssignments generateAssignments(
GridDhtPartitionExchangeId exchId,
- GridDhtPartitionsExchangeFuture exchFut) {
+ GridDhtPartitionsExchangeFuture exchFut
+ ) {
return null;
}
/** {@inheritDoc} */
- @Override public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
- boolean forcePreload,
+ @Override public RebalanceFuture prepare(
+ GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionsExchangeFuture exchFut,
long rebalanceId,
RebalanceFuture next,
@Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
- GridCompoundFuture<Boolean, Boolean> compatibleRebFut) {
+ GridCompoundFuture<Boolean, Boolean> compatibleRebFut
+ ) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8c3b446..21f2652 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -159,17 +159,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
- if (ctx.kernalContext().clientNode())
- return false; // No-op.
-
- AffinityTopologyVersion lastAffChangeTopVer =
- ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion());
-
- return lastAffChangeTopVer.equals(exchFut.topologyVersion());
- }
-
- /** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments generateAssignments(
GridDhtPartitionExchangeId exchId,
GridDhtPartitionsExchangeFuture exchFut
@@ -378,15 +367,20 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public RebalanceFuture addAssignments(
- GridDhtPreloaderAssignments assignments,
- boolean forceRebalance,
+ @Override public RebalanceFuture prepare(
+ GridDhtPartitionExchangeId exchId,
+ @Nullable GridDhtPartitionsExchangeFuture exchFut,
long rebalanceId,
final RebalanceFuture next,
@Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
GridCompoundFuture<Boolean, Boolean> compatibleRebFut
) {
- return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut, compatibleRebFut);
+ long delay = grp.config().getRebalanceDelay();
+ boolean forceRebalance = forcedRebFut != null;
+
+ // Don't delay for dummy reassigns to avoid infinite recursion.
+ return (delay == 0 || forceRebalance) ? demander.addAssignments(generateAssignments(exchId, exchFut), forceRebalance,
+ rebalanceId, next, forcedRebFut, compatibleRebFut) : null;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index 4635830..b1fe03e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
-import com.google.common.collect.Lists;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
@@ -48,7 +47,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -194,37 +192,13 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
cache2.put(i, CACHE2 + "-" + i);
}
- final CountDownLatch startStopRebalanceLatch = new CountDownLatch(1);
- final CountDownLatch finishStopRebalanceLatch = new CountDownLatch(1);
- GridFutureAdapter<Void> stopRebalanceResFut = new GridFutureAdapter<>();
+ ignite = startGrid(1);
- ignite = startGrid(1, cfg -> {
- cfg.setLocalEventListeners(Collections.singletonMap(
- (IgnitePredicate<Event>)evt -> {
- startStopRebalanceLatch.countDown();
-
- try {
- assertTrue(finishStopRebalanceLatch.await(getTestTimeout(), TimeUnit.SECONDS));
-
- stopRebalanceResFut.onDone();
- }
- catch (Throwable e) {
- stopRebalanceResFut.onDone(e);
- }
-
- return false;
- },
- new int[] {EventType.EVT_CACHE_REBALANCE_STOPPED}
- ));
- });
-
- assertTrue(startStopRebalanceLatch.await(getTestTimeout(), TimeUnit.SECONDS));
+ awaitPartitionMapExchange(true, true, null, true);
CacheMetrics metrics1 = ignite.cache(CACHE1).localMetrics();
CacheMetrics metrics2 = ignite.cache(CACHE2).localMetrics();
- finishStopRebalanceLatch.countDown();
-
long rate1 = metrics1.getRebalancingKeysRate();
long rate2 = metrics2.getRebalancingKeysRate();
@@ -234,8 +208,6 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
assertEquals(metrics1.getRebalancedKeys(), rate1);
assertEquals(metrics2.getRebalancedKeys(), rate2);
-
- stopRebalanceResFut.get(getTestTimeout());
}
/**
@@ -245,7 +217,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
public void testCacheGroupRebalance() throws Exception {
IgniteEx ignite0 = startGrid(0);
- List<String> cacheNames = Lists.newArrayList(CACHE4, CACHE5);
+ List<String> cacheNames = Arrays.asList(CACHE4, CACHE5);
int allKeysCount = 0;
@@ -370,7 +342,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
IgniteEx ignite0 = startGrid(0);
- List<String> cacheNames = Lists.newArrayList(CACHE4, CACHE5);
+ List<String> cacheNames = Arrays.asList(CACHE4, CACHE5);
for (String cacheName : cacheNames) {
ignite0.getOrCreateCache(cacheName).putAll(new Random().ints(KEYS_COUNT).distinct().boxed()
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index e6f3bbaf..5c78e3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -1185,8 +1185,6 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
grid(0).resetLostPartitions(Collections.singleton(DEFAULT_CACHE_NAME));
}
- prim.context().cache().context().exchange().rebalanceDelay(500);
-
Random r = new Random();
AtomicBoolean stop = new AtomicBoolean();