You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/17 15:59:11 UTC

[16/23] ignite git commit: IGNITE-8210 Fixed custom event handling for baseline topology change - Fixes #3814.

IGNITE-8210 Fixed custom event handling for baseline topology change - Fixes #3814.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d79c6409
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d79c6409
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d79c6409

Branch: refs/heads/ignite-7708
Commit: d79c6409bcb8ca3170ce9153db486cac2c537fc4
Parents: 7731669
Author: Sergey Chugunov <se...@gmail.com>
Authored: Tue Apr 17 14:28:47 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 17 14:28:47 2018 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  2 +-
 .../distributed/CacheBaselineTopologyTest.java  | 94 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d79c6409/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index b1899e3..9d5ce05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -322,7 +322,7 @@ public class GridAffinityAssignmentCache {
             for (DiscoveryEvent event : events.events()) {
                 boolean affinityNode = CU.affinityNode(event.eventNode(), nodeFilter);
 
-                if (affinityNode) {
+                if (affinityNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                     skipCalculation = false;
 
                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d79c6409/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index 26502ed..0d59a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -81,6 +83,12 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
     private boolean delayRebalance;
 
     /** */
+    private Map<String, Object> userAttrs;
+
+    /** */
+    private static final String DATA_NODE = "dataNodeUserAttr";
+
+    /** */
     private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** {@inheritDoc} */
@@ -129,6 +137,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
             .setWalMode(WALMode.LOG_ONLY)
         );
 
+        if (userAttrs != null)
+            cfg.setUserAttributes(userAttrs);
+
         if (client)
             cfg.setClientMode(true);
 
@@ -139,6 +150,89 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Verifies that rebalance on cache with Node Filter happens when BaselineTopology changes.
+     *
+     * @throws Exception
+     */
+    public void testRebalanceForCacheWithNodeFilter() throws Exception {
+        try {
+            final int EMPTY_NODE_IDX = 2;
+
+            userAttrs = U.newHashMap(1);
+            userAttrs.put(DATA_NODE, true);
+
+            startGrids(2);
+
+            userAttrs.put(DATA_NODE, false);
+
+            IgniteEx ignite = startGrid(2);
+
+            ignite.cluster().active(true);
+
+            awaitPartitionMapExchange();
+
+            IgniteCache<Integer, Integer> cache =
+                ignite.createCache(
+                    new CacheConfiguration<Integer, Integer>()
+                        .setName(CACHE_NAME)
+                        .setCacheMode(PARTITIONED)
+                        .setBackups(1)
+                        .setPartitionLossPolicy(READ_ONLY_SAFE)
+                        .setAffinity(new RendezvousAffinityFunction(32, null))
+                        .setNodeFilter(new DataNodeFilter())
+                );
+
+            for (int k = 0; k < 10_000; k++)
+                cache.put(k, k);
+
+            Thread.sleep(500);
+
+            printSizesDataNodes(NODE_COUNT - 1, EMPTY_NODE_IDX);
+
+            userAttrs.put(DATA_NODE, true);
+
+            startGrid(3);
+
+            ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+            awaitPartitionMapExchange();
+
+            Thread.sleep(500);
+
+            printSizesDataNodes(NODE_COUNT, EMPTY_NODE_IDX);
+        }
+        finally {
+            userAttrs = null;
+        }
+    }
+
+    /** */
+    private void printSizesDataNodes(int nodesCnt, int emptyNodeIdx) {
+        for (int i = 0; i < nodesCnt; i++) {
+            IgniteEx ig = grid(i);
+
+            int locSize = ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY);
+
+            if (i == emptyNodeIdx)
+                assertEquals("Cache local size on "
+                    + i
+                    + " node is expected to be zero", 0, locSize);
+            else
+                assertTrue("Cache local size on "
+                    + i
+                    + " node is expected to be non zero", locSize > 0);
+        }
+    }
+
+    /** */
+    private static class DataNodeFilter implements IgnitePredicate<ClusterNode> {
+
+        @Override public boolean apply(ClusterNode clusterNode) {
+            return clusterNode.attribute(DATA_NODE);
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testTopologyChangesWithFixedBaseline() throws Exception {