You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/17 15:59:11 UTC
[16/23] ignite git commit: IGNITE-8210 Fixed custom event handling
for baseline topology change - Fixes #3814.
IGNITE-8210 Fixed custom event handling for baseline topology change - Fixes #3814.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d79c6409
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d79c6409
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d79c6409
Branch: refs/heads/ignite-7708
Commit: d79c6409bcb8ca3170ce9153db486cac2c537fc4
Parents: 7731669
Author: Sergey Chugunov <se...@gmail.com>
Authored: Tue Apr 17 14:28:47 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 17 14:28:47 2018 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 2 +-
.../distributed/CacheBaselineTopologyTest.java | 94 ++++++++++++++++++++
2 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d79c6409/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index b1899e3..9d5ce05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -322,7 +322,7 @@ public class GridAffinityAssignmentCache {
for (DiscoveryEvent event : events.events()) {
boolean affinityNode = CU.affinityNode(event.eventNode(), nodeFilter);
- if (affinityNode) {
+ if (affinityNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) {
skipCalculation = false;
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d79c6409/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index 26502ed..0d59a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -81,6 +83,12 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
private boolean delayRebalance;
/** */
+ private Map<String, Object> userAttrs;
+
+ /** */
+ private static final String DATA_NODE = "dataNodeUserAttr";
+
+ /** */
private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
@@ -129,6 +137,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
.setWalMode(WALMode.LOG_ONLY)
);
+ if (userAttrs != null)
+ cfg.setUserAttributes(userAttrs);
+
if (client)
cfg.setClientMode(true);
@@ -139,6 +150,89 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
}
/**
+ * Verifies that rebalance on cache with Node Filter happens when BaselineTopology changes.
+ *
+ * @throws Exception
+ */
+ public void testRebalanceForCacheWithNodeFilter() throws Exception {
+ try {
+ final int EMPTY_NODE_IDX = 2;
+
+ userAttrs = U.newHashMap(1);
+ userAttrs.put(DATA_NODE, true);
+
+ startGrids(2);
+
+ userAttrs.put(DATA_NODE, false);
+
+ IgniteEx ignite = startGrid(2);
+
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<Integer, Integer> cache =
+ ignite.createCache(
+ new CacheConfiguration<Integer, Integer>()
+ .setName(CACHE_NAME)
+ .setCacheMode(PARTITIONED)
+ .setBackups(1)
+ .setPartitionLossPolicy(READ_ONLY_SAFE)
+ .setAffinity(new RendezvousAffinityFunction(32, null))
+ .setNodeFilter(new DataNodeFilter())
+ );
+
+ for (int k = 0; k < 10_000; k++)
+ cache.put(k, k);
+
+ Thread.sleep(500);
+
+ printSizesDataNodes(NODE_COUNT - 1, EMPTY_NODE_IDX);
+
+ userAttrs.put(DATA_NODE, true);
+
+ startGrid(3);
+
+ ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+ awaitPartitionMapExchange();
+
+ Thread.sleep(500);
+
+ printSizesDataNodes(NODE_COUNT, EMPTY_NODE_IDX);
+ }
+ finally {
+ userAttrs = null;
+ }
+ }
+
+ /** */
+ private void printSizesDataNodes(int nodesCnt, int emptyNodeIdx) {
+ for (int i = 0; i < nodesCnt; i++) {
+ IgniteEx ig = grid(i);
+
+ int locSize = ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY);
+
+ if (i == emptyNodeIdx)
+ assertEquals("Cache local size on "
+ + i
+ + " node is expected to be zero", 0, locSize);
+ else
+ assertTrue("Cache local size on "
+ + i
+ + " node is expected to be non zero", locSize > 0);
+ }
+ }
+
+ /** */
+ private static class DataNodeFilter implements IgnitePredicate<ClusterNode> {
+
+ @Override public boolean apply(ClusterNode clusterNode) {
+ return clusterNode.attribute(DATA_NODE);
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testTopologyChangesWithFixedBaseline() throws Exception {