You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2019/04/18 06:52:22 UTC
[ignite] branch master updated: IGNITE-11745 Fairly choose nodes
for partition demanding during rebalance - Fixes #6453.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e971a9f IGNITE-11745 Fairly choose nodes for partition demanding during rebalance - Fixes #6453.
e971a9f is described below
commit e971a9fad8e640e782116d0829b832a7a8140f37
Author: Ilya Kasnacheev <il...@gmail.com>
AuthorDate: Thu Apr 18 09:47:28 2019 +0300
IGNITE-11745 Fairly choose nodes for partition demanding during rebalance - Fixes #6453.
Also adds EVT_CACHE_REBALANCE_PART_SUPPLIED and EVT_CACHE_REBALANCE_PART_MISSED events.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../ignite/events/CacheRebalancingEvent.java | 4 +-
.../java/org/apache/ignite/events/EventType.java | 24 +++-
.../processors/cache/CacheGroupContext.java | 50 +++++++
.../dht/preloader/GridDhtPartitionSupplier.java | 14 ++
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../GridCachePartitionedSupplyEventsSelfTest.java | 160 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite2.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite2.java | 2 +
8 files changed, 255 insertions(+), 3 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java
index 5ff424b..22f8d46 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheRebalancingEvent.java
@@ -62,6 +62,8 @@ import org.apache.ignite.lang.IgnitePredicate;
* @see EventType#EVT_CACHE_REBALANCE_STARTED
* @see EventType#EVT_CACHE_REBALANCE_STOPPED
* @see EventType#EVT_CACHE_REBALANCE_PART_DATA_LOST
+ * @see EventType#EVT_CACHE_REBALANCE_PART_SUPPLIED
+ * @see EventType#EVT_CACHE_REBALANCE_PART_MISSED
*/
public class CacheRebalancingEvent extends EventAdapter {
/** */
@@ -183,4 +185,4 @@ public class CacheRebalancingEvent extends EventAdapter {
"type", name(),
"tstamp", timestamp());
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 4c9368f..a2b6ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -585,6 +585,26 @@ public interface EventType {
public static final int EVT_CACHE_REBALANCE_PART_DATA_LOST = 86;
/**
+ * Built-in event type: cache partition was fully sent to remote node.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheRebalancingEvent
+ */
+ public static final int EVT_CACHE_REBALANCE_PART_SUPPLIED = 87;
+
+ /**
+ * Built-in event type: cache partition was not sent to remote node.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheRebalancingEvent
+ */
+ public static final int EVT_CACHE_REBALANCE_PART_MISSED = 88;
+
+ /**
* Built-in event type: query executed.
* <p>
* NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
@@ -1038,7 +1058,9 @@ public interface EventType {
EVT_CACHE_REBALANCE_PART_UNLOADED,
EVT_CACHE_REBALANCE_OBJECT_LOADED,
EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
- EVT_CACHE_REBALANCE_PART_DATA_LOST
+ EVT_CACHE_REBALANCE_PART_DATA_LOST,
+ EVT_CACHE_REBALANCE_PART_SUPPLIED,
+ EVT_CACHE_REBALANCE_PART_MISSED
};
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 9329a6c..0960b4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -69,6 +69,8 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
import static org.apache.ignite.internal.stat.IoStatisticsHolderIndex.HASH_PK_IDX_NAME;
@@ -502,6 +504,54 @@ public class CacheGroupContext {
}
/**
+ * Adds partition supply event.
+ *
+ * @param part Partition.
+ */
+ public void addRebalanceSupplyEvent(int part) {
+ if (!eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
+ LT.warn(log, "Added event without checking if event is recordable: " +
+ U.gridEventName(EVT_CACHE_REBALANCE_PART_SUPPLIED));
+
+ List<GridCacheContext> caches = this.caches;
+
+ for (GridCacheContext cctx : caches)
+ if (!cctx.config().isEventsDisabled())
+ cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+ cctx.localNode(),
+ "Cache partition supplied event.",
+ EVT_CACHE_REBALANCE_PART_SUPPLIED,
+ part,
+ null,
+ 0,
+ 0));
+ }
+
+ /**
+ * Adds partition supply event.
+ *
+ * @param part Partition.
+ */
+ public void addRebalanceMissEvent(int part) {
+ if (!eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
+ LT.warn(log, "Added event without checking if event is recordable: " +
+ U.gridEventName(EVT_CACHE_REBALANCE_PART_MISSED));
+
+ List<GridCacheContext> caches = this.caches;
+
+ for (GridCacheContext cctx : caches)
+ if (!cctx.config().isEventsDisabled())
+ cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+ cctx.localNode(),
+ "Cache partition missed event.",
+ EVT_CACHE_REBALANCE_PART_MISSED,
+ part,
+ null,
+ 0,
+ 0));
+ }
+
+ /**
* @param part Partition.
* @param key Key.
* @param evtNodeId Event node ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 514f8fd..7f52856 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
@@ -365,6 +367,9 @@ class GridDhtPartitionSupplier {
remainingParts.remove(part);
+ if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
+ grp.addRebalanceMissEvent(part);
+
if (log.isDebugEnabled())
log.debug("Requested partition is marked as missing ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ", p=" + part + "]");
@@ -392,6 +397,9 @@ class GridDhtPartitionSupplier {
supplyMsg.last(part, loc.updateCounter());
remainingParts.remove(part);
+
+ if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
+ grp.addRebalanceSupplyEvent(part);
}
}
@@ -409,11 +417,17 @@ class GridDhtPartitionSupplier {
supplyMsg.last(p, loc.updateCounter());
remainingIter.remove();
+
+ if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
+ grp.addRebalanceSupplyEvent(p);
}
else if (iter.isPartitionMissing(p)) {
supplyMsg.missed(p);
remainingIter.remove();
+
+ if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
+ grp.addRebalanceMissEvent(p);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 042e0ea..ff420a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -311,7 +311,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
log.debug("Owning partition as there are no other owners: " + part);
}
else {
- ClusterNode n = picked.get(0);
+ ClusterNode n = picked.get(p % picked.size());
GridDhtPartitionDemandMessage msg = assignments.get(n);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedSupplyEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedSupplyEventsSelfTest.java
new file mode 100644
index 0000000..0d974fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedSupplyEventsSelfTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ */
+public class GridCachePartitionedSupplyEventsSelfTest extends GridCommonAbstractTest {
+ /** Default cache name with cache events disabled. */
+ private static final String DEFAULT_CACHE_NAME_EVTS_DISABLED = DEFAULT_CACHE_NAME + "EvtsDisabled";
+
+ /** */
+ public static final int NODES = 7;
+
+ /** */
+ public static final int PARTS = 8192;
+
+ /** */
+ private final ConcurrentHashMap<UUID, Integer> nodesToPartsSupplied = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ CacheConfiguration<?, ?> ccfg = cacheConfiguration();
+
+ CacheConfiguration<?, ?> ccfgEvtsDisabled = new CacheConfiguration<>(ccfg);
+
+ ccfgEvtsDisabled.setName(DEFAULT_CACHE_NAME_EVTS_DISABLED);
+ ccfgEvtsDisabled.setEventsDisabled(true);
+
+ cfg.setCacheConfiguration(ccfg, ccfgEvtsDisabled);
+
+ cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED, EventType.EVT_CACHE_REBALANCE_PART_MISSED);
+
+ Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap();
+
+ lsnrs.put(new IgnitePredicate<CacheRebalancingEvent>() {
+ @Override public boolean apply(CacheRebalancingEvent evt) {
+ nodesToPartsSupplied.compute(evt.node().id(), (k, v) -> (v == null) ? 1 : (v + 1));
+
+ assertEquals(DEFAULT_CACHE_NAME, evt.cacheName());
+
+ return true;
+ }
+ }, new int[]{EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED});
+
+ lsnrs.put(new IgnitePredicate<CacheRebalancingEvent>() {
+ @Override public boolean apply(CacheRebalancingEvent evt) {
+ //fail("Should not miss any partitions!");
+ log.warning("Missed partition " + evt.partition() + " from node " + evt.node().consistentId());
+
+ assertEquals(DEFAULT_CACHE_NAME, evt.cacheName());
+
+ return true;
+ }
+ }, new int[]{EventType.EVT_CACHE_REBALANCE_PART_MISSED});
+
+ cfg.setLocalEventListeners(lsnrs);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ protected CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setRebalanceMode(SYNC);
+ cacheCfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
+ cacheCfg.setBackups(3);
+ return cacheCfg;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testSupplyEvents() throws Exception {
+ Ignite g = startGrid("g0");
+
+ IgniteCache c1 = g.cache(DEFAULT_CACHE_NAME);
+ IgniteCache c0 = g.cache(DEFAULT_CACHE_NAME_EVTS_DISABLED);
+
+ for (int k = 0; k < PARTS * 2; k++) {
+ c1.put(k, k);
+ c0.put(k, k);
+ }
+
+ for (int n = 1; n <= NODES; n++) {
+ assertTrue(nodesToPartsSupplied.isEmpty());
+
+ startGrid("g" + n);
+
+ int max = 0;
+ int min = PARTS;
+ int total = 0;
+
+ for (int supplied : nodesToPartsSupplied.values()) {
+ assertTrue(supplied > 0);
+
+ max = Math.max(max, supplied);
+ min = Math.min(min, supplied);
+
+ total += supplied;
+ }
+
+ log.info("After start-up of node " + n + " each node supplied " + min + "-" + max + " partitions, total of " + total);
+
+ assertEquals(n, nodesToPartsSupplied.size());
+
+ assertTrue(max > 0);
+ assertTrue(total > 0);
+
+ // Until we have node 5, data is replicated
+ assertTrue(total == PARTS != n >= 4);
+
+ // Quality of distribution of supplied partitions
+ assertTrue(max - min < PARTS >> 4);
+
+ nodesToPartsSupplied.clear();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
index c7467e7..4f4bd81 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColo
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxSingleThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtAtomicEvictionNearReadersSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadOnheapSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedSupplyEventsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedUnloadEventsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedBackupNodeFailureRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearEvictionEventSelfTest;
@@ -160,6 +161,7 @@ public class IgniteCacheMvccTestSuite2 {
ignoredTests.add(IgniteCacheServerNodeConcurrentStart.class);
ignoredTests.add(GridCachePartitionedUnloadEventsSelfTest.class);
+ ignoredTests.add(GridCachePartitionedSupplyEventsSelfTest.class);
ignoredTests.add(IgniteNoCustomEventsOnNodeStart.class);
ignoredTests.add(CacheExchangeMessageDuplicatedStateTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 4d271d2..5d67634 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadStartStopSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadUnloadSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledLockSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedSupplyEventsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedTopologyChangeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedUnloadEventsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheClearDuringRebalanceTest;
@@ -262,6 +263,7 @@ public class IgniteCacheTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedTopologyChangeSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedUnloadEventsSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridCachePartitionedSupplyEventsSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheColocatedOptimisticTransactionSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheAtomicMessageCountSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheNearPartitionedClearSelfTest.class, ignoredTests);