You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2019/04/18 06:52:22 UTC

[ignite] branch master updated: IGNITE-11745 Fairly choose nodes for partition demanding during rebalance - Fixes #6453.

This is an automated email from the ASF dual-hosted git repository.

ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e971a9f  IGNITE-11745 Fairly choose nodes for partition demanding during rebalance - Fixes #6453.
e971a9f is described below

commit e971a9fad8e640e782116d0829b832a7a8140f37
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Thu Apr 18 09:47:28 2019 +0300

    IGNITE-11745 Fairly choose nodes for partition demanding during rebalance - Fixes #6453.
    
    Also adds EVT_CACHE_REBALANCE_PART_SUPPLIED and EVT_CACHE_REBALANCE_PART_MISSED events.
    
    Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
 .../ignite/events/CacheRebalancingEvent.java       |   4 +-
 .../java/org/apache/ignite/events/EventType.java   |  24 +++-
 .../processors/cache/CacheGroupContext.java        |  50 +++++++
 .../dht/preloader/GridDhtPartitionSupplier.java    |  14 ++
 .../dht/preloader/GridDhtPreloader.java            |   2 +-
 .../GridCachePartitionedSupplyEventsSelfTest.java  | 160 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite2.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite2.java   |   2 +
 8 files changed, 255 insertions(+), 3 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java
index 5ff424b..22f8d46 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java
@@ -62,6 +62,8 @@ import org.apache.ignite.lang.IgnitePredicate;
  * @see EventType#EVT_CACHE_REBALANCE_STARTED
  * @see EventType#EVT_CACHE_REBALANCE_STOPPED
  * @see EventType#EVT_CACHE_REBALANCE_PART_DATA_LOST
+ * @see EventType#EVT_CACHE_REBALANCE_PART_SUPPLIED
+ * @see EventType#EVT_CACHE_REBALANCE_PART_MISSED
  */
 public class CacheRebalancingEvent extends EventAdapter {
     /** */
@@ -183,4 +185,4 @@ public class CacheRebalancingEvent extends EventAdapter {
             "type", name(),
             "tstamp", timestamp());
     }
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 4c9368f..a2b6ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -585,6 +585,26 @@ public interface EventType {
     public static final int EVT_CACHE_REBALANCE_PART_DATA_LOST = 86;
 
     /**
+     * Built-in event type: cache partition was fully sent to remote node.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see CacheRebalancingEvent
+     */
+    public static final int EVT_CACHE_REBALANCE_PART_SUPPLIED = 87;
+
+    /**
+     * Built-in event type: cache partition was not sent to remote node.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see CacheRebalancingEvent
+     */
+    public static final int EVT_CACHE_REBALANCE_PART_MISSED = 88;
+
+    /**
      * Built-in event type: query executed.
      * <p>
      * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
@@ -1038,7 +1058,9 @@ public interface EventType {
         EVT_CACHE_REBALANCE_PART_UNLOADED,
         EVT_CACHE_REBALANCE_OBJECT_LOADED,
         EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-        EVT_CACHE_REBALANCE_PART_DATA_LOST
+        EVT_CACHE_REBALANCE_PART_DATA_LOST,
+        EVT_CACHE_REBALANCE_PART_SUPPLIED,
+        EVT_CACHE_REBALANCE_PART_MISSED
     };
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 9329a6c..0960b4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -69,6 +69,8 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.stat.IoStatisticsHolderIndex.HASH_PK_IDX_NAME;
@@ -502,6 +504,54 @@ public class CacheGroupContext {
     }
 
     /**
+     * Adds partition supply event.
+     *
+     * @param part Partition.
+     */
+    public void addRebalanceSupplyEvent(int part) {
+        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
+            LT.warn(log, "Added event without checking if event is recordable: " +
+                U.gridEventName(EVT_CACHE_REBALANCE_PART_SUPPLIED));
+
+        List<GridCacheContext> caches = this.caches;
+
+        for (GridCacheContext cctx : caches)
+            if (!cctx.config().isEventsDisabled())
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache partition supplied event.",
+                    EVT_CACHE_REBALANCE_PART_SUPPLIED,
+                    part,
+                    null,
+                    0,
+                    0));
+    }
+
+    /**
+     * Adds partition supply event.
+     *
+     * @param part Partition.
+     */
+    public void addRebalanceMissEvent(int part) {
+        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
+            LT.warn(log, "Added event without checking if event is recordable: " +
+                U.gridEventName(EVT_CACHE_REBALANCE_PART_MISSED));
+
+        List<GridCacheContext> caches = this.caches;
+
+        for (GridCacheContext cctx : caches)
+            if (!cctx.config().isEventsDisabled())
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache partition missed event.",
+                    EVT_CACHE_REBALANCE_PART_MISSED,
+                    part,
+                    null,
+                    0,
+                    0));
+    }
+
+    /**
      * @param part Partition.
      * @param key Key.
      * @param evtNodeId Event node ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 514f8fd..7f52856 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 
 /**
@@ -365,6 +367,9 @@ class GridDhtPartitionSupplier {
 
                     remainingParts.remove(part);
 
+                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
+                        grp.addRebalanceMissEvent(part);
+
                     if (log.isDebugEnabled())
                         log.debug("Requested partition is marked as missing ["
                             + supplyRoutineInfo(topicId, nodeId, demandMsg) + ", p=" + part + "]");
@@ -392,6 +397,9 @@ class GridDhtPartitionSupplier {
                     supplyMsg.last(part, loc.updateCounter());
 
                     remainingParts.remove(part);
+
+                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
+                        grp.addRebalanceSupplyEvent(part);
                 }
             }
 
@@ -409,11 +417,17 @@ class GridDhtPartitionSupplier {
                     supplyMsg.last(p, loc.updateCounter());
 
                     remainingIter.remove();
+
+                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
+                        grp.addRebalanceSupplyEvent(p);
                 }
                 else if (iter.isPartitionMissing(p)) {
                     supplyMsg.missed(p);
 
                     remainingIter.remove();
+
+                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
+                        grp.addRebalanceMissEvent(p);
                 }
             }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 042e0ea..ff420a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -311,7 +311,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                             log.debug("Owning partition as there are no other owners: " + part);
                     }
                     else {
-                        ClusterNode n = picked.get(0);
+                        ClusterNode n = picked.get(p % picked.size());
 
                         GridDhtPartitionDemandMessage msg = assignments.get(n);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedSupplyEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedSupplyEventsSelfTest.java
new file mode 100644
index 0000000..0d974fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedSupplyEventsSelfTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ */
+public class GridCachePartitionedSupplyEventsSelfTest extends GridCommonAbstractTest {
+    /** Default cache name with cache events disabled. */
+    private static final String DEFAULT_CACHE_NAME_EVTS_DISABLED = DEFAULT_CACHE_NAME + "EvtsDisabled";
+
+    /** */
+    public static final int NODES = 7;
+
+    /** */
+    public static final int PARTS = 8192;
+
+    /** */
+    private final ConcurrentHashMap<UUID, Integer> nodesToPartsSupplied = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        CacheConfiguration<?, ?> ccfg = cacheConfiguration();
+
+        CacheConfiguration<?, ?> ccfgEvtsDisabled = new CacheConfiguration<>(ccfg);
+
+        ccfgEvtsDisabled.setName(DEFAULT_CACHE_NAME_EVTS_DISABLED);
+        ccfgEvtsDisabled.setEventsDisabled(true);
+
+        cfg.setCacheConfiguration(ccfg, ccfgEvtsDisabled);
+
+        cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED, EventType.EVT_CACHE_REBALANCE_PART_MISSED);
+
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap();
+
+        lsnrs.put(new IgnitePredicate<CacheRebalancingEvent>() {
+                @Override public boolean apply(CacheRebalancingEvent evt) {
+                    nodesToPartsSupplied.compute(evt.node().id(), (k, v) -> (v == null) ? 1 : (v + 1));
+
+                    assertEquals(DEFAULT_CACHE_NAME, evt.cacheName());
+
+                    return true;
+                }
+            }, new int[]{EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED});
+
+        lsnrs.put(new IgnitePredicate<CacheRebalancingEvent>() {
+                @Override public boolean apply(CacheRebalancingEvent evt) {
+                    //fail("Should not miss any partitions!");
+                    log.warning("Missed partition " + evt.partition() + " from node " + evt.node().consistentId());
+
+                    assertEquals(DEFAULT_CACHE_NAME, evt.cacheName());
+
+                    return true;
+                }
+            }, new int[]{EventType.EVT_CACHE_REBALANCE_PART_MISSED});
+
+        cfg.setLocalEventListeners(lsnrs);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setRebalanceMode(SYNC);
+        cacheCfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
+        cacheCfg.setBackups(3);
+        return cacheCfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testSupplyEvents() throws Exception {
+        Ignite g = startGrid("g0");
+
+        IgniteCache c1 = g.cache(DEFAULT_CACHE_NAME);
+        IgniteCache c0 = g.cache(DEFAULT_CACHE_NAME_EVTS_DISABLED);
+
+        for (int k = 0; k < PARTS * 2; k++) {
+            c1.put(k, k);
+            c0.put(k, k);
+        }
+
+        for (int n = 1; n <= NODES; n++) {
+            assertTrue(nodesToPartsSupplied.isEmpty());
+
+            startGrid("g" + n);
+
+            int max = 0;
+            int min = PARTS;
+            int total = 0;
+
+            for (int supplied : nodesToPartsSupplied.values()) {
+                assertTrue(supplied > 0);
+
+                max = Math.max(max, supplied);
+                min = Math.min(min, supplied);
+
+                total += supplied;
+            }
+
+            log.info("After start-up of node " + n + " each node supplied " + min + "-" + max + " partitions, total of " + total);
+
+            assertEquals(n, nodesToPartsSupplied.size());
+
+            assertTrue(max > 0);
+            assertTrue(total > 0);
+
+            // Until we have node 5, data is replicated
+            assertTrue(total == PARTS != n >= 4);
+
+            // Quality of distribution of supplied partitions
+            assertTrue(max - min < PARTS >> 4);
+
+            nodesToPartsSupplied.clear();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
index c7467e7..4f4bd81 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColo
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxSingleThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtAtomicEvictionNearReadersSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadOnheapSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedSupplyEventsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedUnloadEventsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedBackupNodeFailureRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearEvictionEventSelfTest;
@@ -160,6 +161,7 @@ public class IgniteCacheMvccTestSuite2 {
         ignoredTests.add(IgniteCacheServerNodeConcurrentStart.class);
 
         ignoredTests.add(GridCachePartitionedUnloadEventsSelfTest.class);
+        ignoredTests.add(GridCachePartitionedSupplyEventsSelfTest.class);
 
         ignoredTests.add(IgniteNoCustomEventsOnNodeStart.class);
         ignoredTests.add(CacheExchangeMessageDuplicatedStateTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 4d271d2..5d67634 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadStartStopSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadUnloadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledLockSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedSupplyEventsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedTopologyChangeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedUnloadEventsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheClearDuringRebalanceTest;
@@ -262,6 +263,7 @@ public class IgniteCacheTestSuite2 {
 
         GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedTopologyChangeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedUnloadEventsSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedSupplyEventsSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheColocatedOptimisticTransactionSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheAtomicMessageCountSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheNearPartitionedClearSelfTest.class, ignoredTests);