You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/23 08:40:05 UTC

[ignite] branch master updated: IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b9c247  IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853.
6b9c247 is described below

commit 6b9c24786972174c5a3bf125d2cfb0644cf10957
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Wed Jan 23 11:20:46 2019 +0300

    IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../processors/cache/ClusterCachesInfo.java        |  56 ++++++
 .../processors/cache/GridCacheProcessor.java       |  10 +
 ...gniteDiscoveryDataHandlingInNewClusterTest.java | 220 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite4.java   |   2 +
 4 files changed, 288 insertions(+)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index e3b67f2..7d5816a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -133,6 +134,61 @@ class ClusterCachesInfo {
     }
 
     /**
+     * Filters all dynamic cache descriptors and groups that were not presented on node start
+     * and were received with grid discovery data.
+     *
+     * @param localConfigData node's local cache configurations
+     * (both from static config and stored with persistent caches).
+     *
+     */
+    public void filterDynamicCacheDescriptors(CacheJoinNodeDiscoveryData localConfigData) {
+        if (ctx.isDaemon())
+            return;
+
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = localConfigData.caches();
+
+        Iterator<Map.Entry<String, DynamicCacheDescriptor>> cachesIter = registeredCaches.entrySet().iterator();
+
+        while (cachesIter.hasNext()) {
+            Map.Entry<String, DynamicCacheDescriptor> e = cachesIter.next();
+
+            if (!caches.containsKey(e.getKey())) {
+                cachesIter.remove();
+
+                ctx.discovery().removeCacheFilter(e.getKey());
+            }
+        }
+
+        Iterator<Map.Entry<Integer, CacheGroupDescriptor>> grpsIter = registeredCacheGrps.entrySet().iterator();
+
+        while (grpsIter.hasNext()) {
+            Map.Entry<Integer, CacheGroupDescriptor> e = grpsIter.next();
+
+            boolean removeGrp = true;
+
+            for (DynamicCacheDescriptor cacheDescr : registeredCaches.values()) {
+                if (cacheDescr.groupId() == e.getKey()) {
+                    removeGrp = false;
+
+                    break;
+                }
+            }
+
+            if (removeGrp) {
+                grpsIter.remove();
+
+                ctx.discovery().removeCacheGroup(e.getValue());
+            }
+        }
+
+        locJoinCachesCtx = new LocalJoinCachesContext(
+            locJoinCachesCtx.caches(),
+            locJoinCachesCtx.initCaches(),
+            registeredCacheGrps,
+            registeredCaches);
+    }
+
+    /**
      * @param joinDiscoData Information about configured caches and templates.
      * @throws IgniteCheckedException If configuration validation failed.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 933a9e3..84dfe37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -284,6 +284,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Tmp storage for meta migration. */
     private MetaStorage.TmpStorage tmpStorage;
 
+    /** Node's local cache configurations (both from static configuration and from persistent caches). */
+    private CacheJoinNodeDiscoveryData localConfigs;
+
     /**
      * @param ctx Kernal context.
      */
@@ -783,6 +786,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             startAllCachesOnClientStart()
         );
 
+        localConfigs = discoData;
+
         cachesInfo.onStart(discoData);
     }
 
@@ -1982,6 +1987,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Caches to be started when this node starts.
      */
     @Nullable public LocalJoinCachesContext localJoinCachesContext() {
+        if (ctx.discovery().localNode().order() == 1 && localConfigs != null)
+            cachesInfo.filterDynamicCacheDescriptors(localConfigs);
+
+        localConfigs = null;
+
         return cachesInfo.localJoinCachesContext();
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java
new file mode 100644
index 0000000..2279751
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ *
+ */
+@RunWith(JUnit4.class)
+public class IgniteDiscoveryDataHandlingInNewClusterTest extends GridCommonAbstractTest {
+    /** */
+    private static final String NODE_1_CONS_ID = "node01";
+
+    /** */
+    private static final String NODE_2_CONS_ID = "node02";
+
+    /** */
+    private static final String NODE_3_CONS_ID = "node03";
+
+    /** */
+    private static final String STATIC_CACHE_NAME = "staticCache";
+
+    /** */
+    private static final String DYNAMIC_CACHE_NAME_1 = "dynamicCache1";
+
+    /** */
+    private static final String DYNAMIC_CACHE_NAME_2 = "dynamicCache2";
+
+    /** Group where static and dynamic caches reside. */
+    private static final String GROUP_WITH_STATIC_CACHES = "group1";
+
+    /** Group where only dynamic caches reside. */
+    private static final String GROUP_WITH_DYNAMIC_CACHES = "group2";
+
+    /** Node filter to pin dynamic caches to a specific node. */
+    private static final IgnitePredicate<ClusterNode> nodeFilter = new IgnitePredicate<ClusterNode>() {
+        @Override public boolean apply(ClusterNode node) {
+            return node.consistentId().toString().contains(NODE_1_CONS_ID);
+        }
+    };
+
+    /** Discovery SPI aimed to fail node with it when another server node joins the topology. */
+    private TcpDiscoverySpi failingOnNodeJoinSpi = new TcpDiscoverySpi() {
+        @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                super.startMessageProcess(msg);
+
+                throw new RuntimeException("Simulation of failure of node " + NODE_1_CONS_ID);
+            }
+
+            super.startMessageProcess(msg);
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.contains(NODE_1_CONS_ID)) {
+            failingOnNodeJoinSpi.setIpFinder(sharedStaticIpFinder);
+            failingOnNodeJoinSpi.setJoinTimeout(60_000);
+
+            cfg.setDiscoverySpi(failingOnNodeJoinSpi);
+        }
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        if (igniteInstanceName.contains("client"))
+            cfg.setClientMode(true);
+        else {
+            cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setInitialSize(10500000)
+                            .setMaxSize(6659883008L)
+                            .setPersistenceEnabled(false)
+                    )
+            );
+        }
+
+        CacheConfiguration staticCacheCfg = new CacheConfiguration(STATIC_CACHE_NAME)
+            .setGroupName(GROUP_WITH_STATIC_CACHES)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setNodeFilter(nodeFilter);
+
+        cfg.setCacheConfiguration(staticCacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Verifies that new node received discovery data from stopped grid filters it out
+     * from GridCacheProcessor and GridDiscoveryManager internal structures.
+     *
+     * All subsequent servers and clients join topology successfully.
+     *
+     * See related ticket <a href="https://issues.apache.org/jira/browse/IGNITE-10878">IGNITE-10878</a>.
+     */
+    @Test
+    public void testNewClusterFiltersDiscoveryDataReceivedFromStoppedCluster() throws Exception {
+        IgniteEx ig0 = startGrid(NODE_1_CONS_ID);
+
+        prepareDynamicCaches(ig0);
+
+        IgniteEx ig1 = startGrid(NODE_2_CONS_ID);
+
+        verifyCachesAndGroups(ig1);
+
+        IgniteEx ig2 = startGrid(NODE_3_CONS_ID);
+
+        verifyCachesAndGroups(ig2);
+
+        IgniteEx client = startGrid("client01");
+
+        verifyCachesAndGroups(client);
+    }
+
+    /** */
+    private void verifyCachesAndGroups(IgniteEx ignite) {
+        Map<String, DynamicCacheDescriptor> caches = ignite.context().cache().cacheDescriptors();
+
+        assertEquals(2, caches.size());
+        caches.keySet().contains(GridCacheUtils.UTILITY_CACHE_NAME);
+        caches.keySet().contains(STATIC_CACHE_NAME);
+
+        Map<Integer, CacheGroupDescriptor> groups = ignite.context().cache().cacheGroupDescriptors();
+
+        assertEquals(2, groups.size());
+
+        boolean defaultGroupFound = false;
+        boolean staticCachesGroupFound = false;
+
+        for (CacheGroupDescriptor grpDesc : groups.values()) {
+            if (grpDesc.cacheOrGroupName().equals(GridCacheUtils.UTILITY_CACHE_NAME))
+                defaultGroupFound = true;
+            else if (grpDesc.cacheOrGroupName().equals(GROUP_WITH_STATIC_CACHES))
+                staticCachesGroupFound = true;
+        }
+
+        assertTrue(String.format("Default group found: %b, static group found: %b",
+            defaultGroupFound,
+            staticCachesGroupFound),
+            defaultGroupFound && staticCachesGroupFound);
+    }
+
+    /** */
+    private void prepareDynamicCaches(IgniteEx ig) {
+        ig.getOrCreateCache(new CacheConfiguration<>(DYNAMIC_CACHE_NAME_1)
+            .setGroupName(GROUP_WITH_STATIC_CACHES)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setNodeFilter(nodeFilter)
+        );
+
+        ig.getOrCreateCache(new CacheConfiguration<>(DYNAMIC_CACHE_NAME_2)
+            .setGroupName(GROUP_WITH_DYNAMIC_CACHES)
+            .setAffinity(new RendezvousAffinityFunction(false, 16))
+            .setNodeFilter((IgnitePredicate<ClusterNode>)node -> node.consistentId().toString().contains(NODE_1_CONS_ID))
+        );
+    }
+
+    /**
+     * Turns off printing stack trace on detecting critical failure to speed up tests.
+     */
+    @BeforeClass
+    public static void setUpClass() {
+        System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "false");
+    }
+
+    /**
+     * Restoring default value for printing stack trace setting.
+     */
+    @AfterClass
+    public static void tearDownClass() {
+        System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "true");
+    }
+
+    /** {@inheritDoc} */
+    @After
+    @Override public void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index a1a2f42..6cfa534 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxPreloadNoWriteTe
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxStoreValueTest;
 import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest;
+import org.apache.ignite.internal.processors.cache.IgniteDiscoveryDataHandlingInNewClusterTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartCoordinatorFailoverTest;
@@ -258,6 +259,7 @@ public class IgniteCacheTestSuite4 {
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheCreatePutTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheStartOnJoinTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheStartTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, IgniteDiscoveryDataHandlingInNewClusterTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheDiscoveryDataConcurrentJoinTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteClientCacheInitializationFailTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheFailedUpdateResponseTest.class, ignoredTests);