You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/03/09 20:39:11 UTC

[ignite] branch master updated: IGNITE-12705 SYNC caches are rebalanced in the first place.

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d97f62  IGNITE-12705 SYNC caches are rebalanced in the first place.
6d97f62 is described below

commit 6d97f625f856d0d5d05864497d2f88bc53d91738
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Mon Mar 9 23:38:39 2020 +0300

    IGNITE-12705 SYNC caches are rebalanced in the first place.
---
 .../ignite/configuration/CacheConfiguration.java   |  12 +-
 .../cache/GridCachePartitionExchangeManager.java   |  71 ++++++++-
 .../rebalancing/GridCacheRebalanceOrderTest.java   | 177 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite8.java   |   2 +
 4 files changed, 252 insertions(+), 10 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 36e0b29..be92fd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1202,15 +1202,13 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /**
      * Gets cache rebalance order. Rebalance order can be set to non-zero value for caches with
      * {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC ASYNC} rebalance modes only.
+     * Note that caches with {@link CacheRebalanceMode#SYNC SYNC} rebalancing mode are always rebalanced prior to caches
+     * with {@link CacheRebalanceMode#ASYNC ASYNC} rebalancing mode when rebalancing order is the same.
      * <p/>
-     * If cache rebalance order is positive, rebalancing for this cache will be started only when rebalancing for
+     * The rebalance order guarantees that rebalancing for this cache will start only when rebalancing for
      * all caches with smaller rebalance order will be completed.
      * <p/>
-     * Note that cache with order {@code 0} does not participate in ordering. This means that cache with
-     * rebalance order {@code 0} will never wait for any other caches. All caches with order {@code 0} will
-     * be rebalanced right away concurrently with each other and ordered rebalance processes.
-     * <p/>
-     * If not set, cache order is 0, i.e. rebalancing is not ordered.
+     * If not set, cache order is 0.
      *
      * @return Cache rebalance order.
      */
@@ -2358,7 +2356,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
      */
     public CacheConfiguration<K, V> setEncryptionEnabled(boolean encryptionEnabled) {
         this.encryptionEnabled = encryptionEnabled;
-        
+
         return this;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 89d4f3b..f5faab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -51,6 +51,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterGroup;
@@ -3377,14 +3378,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (assignsMap != null && rebTopVer.equals(NONE)) {
                         int size = assignsMap.size();
 
-                        NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+                        NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>();
 
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
                             int grpId = e.getKey();
 
                             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                            int order = grp.config().getRebalanceOrder();
+                            CacheRebalanceOrder order = new CacheRebalanceOrder(
+                                grp.config().getRebalanceOrder(),
+                                grp.config().getRebalanceMode());
 
                             if (orderMap.get(order) == null)
                                 orderMap.put(order, new ArrayList<Integer>(size));
@@ -3403,7 +3406,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         if (task instanceof ForceRebalanceExchangeTask)
                             forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                        for (Integer order : orderMap.descendingKeySet()) {
+                        for (CacheRebalanceOrder order : orderMap.descendingKeySet()) {
                             for (Integer grpId : orderMap.get(order)) {
                                 CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
@@ -3852,4 +3855,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             activeObjects.clear();
         }
     }
+
+    /**
+     * Represents a cache rebalance order that takes into account both values: rebalance order itself and rebalance mode.
+     * It is assumed SYNC caches should be rebalanced in the first place.
+     */
+    private static class CacheRebalanceOrder implements Comparable<CacheRebalanceOrder> {
+        /** Cache rebalance order. */
+        private int order;
+
+        /** Cache rebalance mode. */
+        private CacheRebalanceMode mode;
+
+        /**
+         * Creates a new instance of CacheRebalanceOrder.
+         *
+         * @param order Cache rebalance order.
+         * @param mode Cache rebalance mode.
+         */
+        public CacheRebalanceOrder(int order, CacheRebalanceMode mode) {
+            this.order = order;
+            this.mode = mode;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull CacheRebalanceOrder o) {
+            if (order == o.order) {
+                if (mode == o.mode)
+                    return 0;
+
+                switch (mode) {
+                    case SYNC: return -1;
+                    case ASYNC: return o.mode == CacheRebalanceMode.SYNC ? 1 : -1;
+                    case NONE: return 1;
+                    default:
+                        throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + mode + ']');
+                }
+            }
+            else
+                return (order < o.order) ? -1 : 1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            CacheRebalanceOrder order1 = (CacheRebalanceOrder)o;
+
+            if (order != order1.order)
+                return false;
+            return mode == order1.mode;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = order;
+            result = 31 * result + mode.hashCode();
+            return result;
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalanceOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalanceOrderTest.java
new file mode 100644
index 0000000..99ecc19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalanceOrderTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+
+/**
+ * Tests that caches with rebalance mode equals to SYNC are rebalanced in the first place.
+ */
+public class GridCacheRebalanceOrderTest extends GridCommonAbstractTest {
+    /** Rebalance timeout. */
+    private static final long REBALANCE_TIMEOUT = 5_000;
+
+    /** Flag indicates that local listener should be used to track rebalance events. */
+    private boolean trackRebalanceEvts;
+
+    /** Caches rebalance events. */
+    private final List<CacheRebalancingEvent> evtsList = Collections.synchronizedList(new ArrayList<>());
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (trackRebalanceEvts) {
+            Map<IgnitePredicate<? extends Event>, int[]> listeners = new HashMap<>();
+
+            listeners.put(new IgnitePredicate<CacheRebalancingEvent>() {
+                @Override public boolean apply(CacheRebalancingEvent evt) {
+                    evtsList.add(evt);
+
+                    return true;
+                }
+            }, new int[] {EVT_CACHE_REBALANCE_STOPPED});
+
+            cfg.setLocalEventListeners(listeners);
+
+            cfg.setIncludeEventTypes(EventType.EVTS_ALL);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * Stops all nodes after test.
+     */
+    @After
+    public void testCleanup() {
+        stopAllGrids();
+    }
+
+    /**
+     * Tests that caches with rebalance mode equals to SYNC are rebalanced in the first place.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceOrderBasedOnCacheRebalanceMode() throws Exception {
+        Ignite g = startGrid(0);
+
+        // Fix the expected order of rebalance.
+        List<IgniteBiTuple<Integer, CacheRebalanceMode>> order = new ArrayList<>();
+        order.add(new T2<>(0, SYNC));
+        order.add(new T2<>(0, SYNC));
+        order.add(new T2<>(0, ASYNC));
+        order.add(new T2<>(0, ASYNC));
+        order.add(new T2<>(1, SYNC));
+        order.add(new T2<>(1, SYNC));
+        order.add(new T2<>(1, ASYNC));
+        order.add(new T2<>(1, ASYNC));
+
+        // Prepare caches with different rebalance mode and order.
+        List<IgniteCache<Integer, Integer>> caches = new ArrayList<>();
+        for (int i = order.size() - 1; i >= 0; i--) {
+            int rebalanceOrder = order.get(i).get1();
+
+            CacheRebalanceMode rebalanceMode = order.get(i).get2();
+
+            caches.add(g.getOrCreateCache(getCacheConfiguration(
+                "cache-" + i + "-order-" + rebalanceOrder + "-mode-" + rebalanceMode,
+                rebalanceOrder,
+                rebalanceMode)));
+        }
+
+        // Fill values.
+        for (IgniteCache<Integer, Integer> c : caches)
+            c.put(12, 21);
+
+        trackRebalanceEvts = true;
+
+        Ignite g1 = startGrid(1);
+
+        // Wait for all rebalance futures.
+        for (IgniteCache<Integer, Integer> c : caches)
+            grid(1).context().cache().internalCache(c.getName()).preloader().syncFuture().get(REBALANCE_TIMEOUT);
+
+        // Check that all events were fired.
+        assertEquals("Expected rebalance events were not triggered.", order.size(), evtsList.size());
+
+        // Check rebelance order.
+        for (int i = 0; i < order.size(); ++i) {
+            int expOrder = order.get(i).get1();
+
+            CacheRebalanceMode expMode = order.get(i).get2();
+
+            CacheRebalancingEvent actualEvt = evtsList.get(i);
+
+            CacheConfiguration<Integer, Integer> actualCfg = g1.cache(actualEvt.cacheName())
+                .getConfiguration(CacheConfiguration.class);
+
+            assertEquals(
+                "Unexpected rebalance order [cacheName=" + actualEvt.cacheName() + ']',
+                expOrder,
+                actualCfg.getRebalanceOrder());
+
+            assertEquals(
+                "Unexpected cache rebalance mode [cacheName=" + actualEvt.cacheName() + ']',
+                expMode,
+                actualCfg.getRebalanceMode());
+        }
+    }
+
+    /**
+     * Creates a new cache configuration with the given parameters.
+     *
+     * @param cacheName Cache name.
+     * @param rebalanceOrder Rebalance order.
+     * @param rebalanceMode Rebalance mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> getCacheConfiguration(
+        String cacheName,
+        int rebalanceOrder,
+        CacheRebalanceMode rebalanceMode
+    ) {
+        return new CacheConfiguration<Integer, Integer>(cacheName)
+            .setRebalanceOrder(rebalanceOrder)
+            .setRebalanceMode(rebalanceMode)
+            .setBackups(1);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
index 2908892..894918e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.internal.processors.cache.CacheStoreTxPutAllMultiNodeTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOrderedPreloadingSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalanceOrderTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingCancelTest;
@@ -66,6 +67,7 @@ public class IgniteCacheTestSuite8 {
 
         // Rebalancing.
         GridTestUtils.addTestIfNeeded(suite, GridCacheOrderedPreloadingSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, GridCacheRebalanceOrderTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingSyncSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingSyncCheckDataTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheRebalancingUnmarshallingFailedSelfTest.class, ignoredTests);