You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/03/09 20:39:11 UTC
[ignite] branch master updated: IGNITE-12705 SYNC caches are
rebalanced in the first place.
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6d97f62 IGNITE-12705 SYNC caches are rebalanced in the first place.
6d97f62 is described below
commit 6d97f625f856d0d5d05864497d2f88bc53d91738
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Mon Mar 9 23:38:39 2020 +0300
IGNITE-12705 SYNC caches are rebalanced in the first place.
---
.../ignite/configuration/CacheConfiguration.java | 12 +-
.../cache/GridCachePartitionExchangeManager.java | 71 ++++++++-
.../rebalancing/GridCacheRebalanceOrderTest.java | 177 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite8.java | 2 +
4 files changed, 252 insertions(+), 10 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 36e0b29..be92fd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1202,15 +1202,13 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/**
* Gets cache rebalance order. Rebalance order can be set to non-zero value for caches with
* {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC ASYNC} rebalance modes only.
+ * Note that caches with {@link CacheRebalanceMode#SYNC SYNC} rebalancing mode are always rebalanced prior to caches
+ * with {@link CacheRebalanceMode#ASYNC ASYNC} rebalancing mode when rebalancing order is the same.
* <p/>
- * If cache rebalance order is positive, rebalancing for this cache will be started only when rebalancing for
+ * The rebalance order guarantees that rebalancing for this cache will start only when rebalancing for
* all caches with smaller rebalance order will be completed.
* <p/>
- * Note that cache with order {@code 0} does not participate in ordering. This means that cache with
- * rebalance order {@code 0} will never wait for any other caches. All caches with order {@code 0} will
- * be rebalanced right away concurrently with each other and ordered rebalance processes.
- * <p/>
- * If not set, cache order is 0, i.e. rebalancing is not ordered.
+ * If not set, cache order is 0.
*
* @return Cache rebalance order.
*/
@@ -2358,7 +2356,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
*/
public CacheConfiguration<K, V> setEncryptionEnabled(boolean encryptionEnabled) {
this.encryptionEnabled = encryptionEnabled;
-
+
return this;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 89d4f3b..f5faab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -51,6 +51,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterGroup;
@@ -3377,14 +3378,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (assignsMap != null && rebTopVer.equals(NONE)) {
int size = assignsMap.size();
- NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+ NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>();
for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
int grpId = e.getKey();
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
- int order = grp.config().getRebalanceOrder();
+ CacheRebalanceOrder order = new CacheRebalanceOrder(
+ grp.config().getRebalanceOrder(),
+ grp.config().getRebalanceMode());
if (orderMap.get(order) == null)
orderMap.put(order, new ArrayList<Integer>(size));
@@ -3403,7 +3406,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (task instanceof ForceRebalanceExchangeTask)
forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
- for (Integer order : orderMap.descendingKeySet()) {
+ for (CacheRebalanceOrder order : orderMap.descendingKeySet()) {
for (Integer grpId : orderMap.get(order)) {
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
@@ -3852,4 +3855,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
activeObjects.clear();
}
}
+
+ /**
+ * Represents a cache rebalance order that takes into account both values: rebalance order itself and rebalance mode.
+ * It is assumed SYNC caches should be rebalanced in the first place.
+ */
+ private static class CacheRebalanceOrder implements Comparable<CacheRebalanceOrder> {
+ /** Cache rebalance order. */
+ private int order;
+
+ /** Cache rebalance mode. */
+ private CacheRebalanceMode mode;
+
+ /**
+ * Creates a new instance of CacheRebalanceOrder.
+ *
+ * @param order Cache rebalance order.
+ * @param mode Cache rebalance mode.
+ */
+ public CacheRebalanceOrder(int order, CacheRebalanceMode mode) {
+ this.order = order;
+ this.mode = mode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull CacheRebalanceOrder o) {
+ if (order == o.order) {
+ if (mode == o.mode)
+ return 0;
+
+ switch (mode) {
+ case SYNC: return -1;
+ case ASYNC: return o.mode == CacheRebalanceMode.SYNC ? 1 : -1;
+ case NONE: return 1;
+ default:
+ throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + mode + ']');
+ }
+ }
+ else
+ return (order < o.order) ? -1 : 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ CacheRebalanceOrder order1 = (CacheRebalanceOrder)o;
+
+ if (order != order1.order)
+ return false;
+ return mode == order1.mode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = order;
+ result = 31 * result + mode.hashCode();
+ return result;
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalanceOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalanceOrderTest.java
new file mode 100644
index 0000000..99ecc19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalanceOrderTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+
+/**
+ * Tests that caches with rebalance mode equals to SYNC are rebalanced in the first place.
+ */
+public class GridCacheRebalanceOrderTest extends GridCommonAbstractTest {
+ /** Rebalance timeout. */
+ private static final long REBALANCE_TIMEOUT = 5_000;
+
+ /** Flag indicates that local listener should be used to track rebalance events. */
+ private boolean trackRebalanceEvts;
+
+ /** Caches rebalance events. */
+ private final List<CacheRebalancingEvent> evtsList = Collections.synchronizedList(new ArrayList<>());
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (trackRebalanceEvts) {
+ Map<IgnitePredicate<? extends Event>, int[]> listeners = new HashMap<>();
+
+ listeners.put(new IgnitePredicate<CacheRebalancingEvent>() {
+ @Override public boolean apply(CacheRebalancingEvent evt) {
+ evtsList.add(evt);
+
+ return true;
+ }
+ }, new int[] {EVT_CACHE_REBALANCE_STOPPED});
+
+ cfg.setLocalEventListeners(listeners);
+
+ cfg.setIncludeEventTypes(EventType.EVTS_ALL);
+ }
+
+ return cfg;
+ }
+
+ /**
+ * Stops all nodes after test.
+ */
+ @After
+ public void testCleanup() {
+ stopAllGrids();
+ }
+
+ /**
+ * Tests that caches with rebalance mode equals to SYNC are rebalanced in the first place.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceOrderBasedOnCacheRebalanceMode() throws Exception {
+ Ignite g = startGrid(0);
+
+ // Fix the expected order of rebalance.
+ List<IgniteBiTuple<Integer, CacheRebalanceMode>> order = new ArrayList<>();
+ order.add(new T2<>(0, SYNC));
+ order.add(new T2<>(0, SYNC));
+ order.add(new T2<>(0, ASYNC));
+ order.add(new T2<>(0, ASYNC));
+ order.add(new T2<>(1, SYNC));
+ order.add(new T2<>(1, SYNC));
+ order.add(new T2<>(1, ASYNC));
+ order.add(new T2<>(1, ASYNC));
+
+ // Prepare caches with different rebalance mode and order.
+ List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+ for (int i = order.size() - 1; i >= 0; i--) {
+ int rebalanceOrder = order.get(i).get1();
+
+ CacheRebalanceMode rebalanceMode = order.get(i).get2();
+
+ caches.add(g.getOrCreateCache(getCacheConfiguration(
+ "cache-" + i + "-order-" + rebalanceOrder + "-mode-" + rebalanceMode,
+ rebalanceOrder,
+ rebalanceMode)));
+ }
+
+ // Fill values.
+ for (IgniteCache<Integer, Integer> c : caches)
+ c.put(12, 21);
+
+ trackRebalanceEvts = true;
+
+ Ignite g1 = startGrid(1);
+
+ // Wait for all rebalance futures.
+ for (IgniteCache<Integer, Integer> c : caches)
+ grid(1).context().cache().internalCache(c.getName()).preloader().syncFuture().get(REBALANCE_TIMEOUT);
+
+ // Check that all events were fired.
+ assertEquals("Expected rebalance events were not triggered.", order.size(), evtsList.size());
+
+ // Check rebelance order.
+ for (int i = 0; i < order.size(); ++i) {
+ int expOrder = order.get(i).get1();
+
+ CacheRebalanceMode expMode = order.get(i).get2();
+
+ CacheRebalancingEvent actualEvt = evtsList.get(i);
+
+ CacheConfiguration<Integer, Integer> actualCfg = g1.cache(actualEvt.cacheName())
+ .getConfiguration(CacheConfiguration.class);
+
+ assertEquals(
+ "Unexpected rebalance order [cacheName=" + actualEvt.cacheName() + ']',
+ expOrder,
+ actualCfg.getRebalanceOrder());
+
+ assertEquals(
+ "Unexpected cache rebalance mode [cacheName=" + actualEvt.cacheName() + ']',
+ expMode,
+ actualCfg.getRebalanceMode());
+ }
+ }
+
+ /**
+ * Creates a new cache configuration with the given parameters.
+ *
+ * @param cacheName Cache name.
+ * @param rebalanceOrder Rebalance order.
+ * @param rebalanceMode Rebalance mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> getCacheConfiguration(
+ String cacheName,
+ int rebalanceOrder,
+ CacheRebalanceMode rebalanceMode
+ ) {
+ return new CacheConfiguration<Integer, Integer>(cacheName)
+ .setRebalanceOrder(rebalanceOrder)
+ .setRebalanceMode(rebalanceMode)
+ .setBackups(1);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
index 2908892..894918e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.ignite.internal.processors.cache.CacheStoreTxPutAllMultiNodeTest;
import org.apache.ignite.internal.processors.cache.GridCacheOrderedPreloadingSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalanceOrderTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingCancelTest;
@@ -66,6 +67,7 @@ public class IgniteCacheTestSuite8 {
// Rebalancing.
GridTestUtils.addTestIfNeeded(suite, GridCacheOrderedPreloadingSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridCacheRebalanceOrderTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingSyncSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingSyncCheckDataTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingUnmarshallingFailedSelfTest.class, ignoredTests);