You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/23 08:40:05 UTC
[ignite] branch master updated: IGNITE-10878 Filter cache
descriptors received from grid if node is the first node in topology -
Fixes #5853.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6b9c247 IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853.
6b9c247 is described below
commit 6b9c24786972174c5a3bf125d2cfb0644cf10957
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Wed Jan 23 11:20:46 2019 +0300
IGNITE-10878 Filter cache descriptors received from grid if node is the first node in topology - Fixes #5853.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../processors/cache/ClusterCachesInfo.java | 56 ++++++
.../processors/cache/GridCacheProcessor.java | 10 +
...gniteDiscoveryDataHandlingInNewClusterTest.java | 220 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite4.java | 2 +
4 files changed, 288 insertions(+)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index e3b67f2..7d5816a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -133,6 +134,61 @@ class ClusterCachesInfo {
}
/**
+ * Filters all dynamic cache descriptors and groups that were not presented on node start
+ * and were received with grid discovery data.
+ *
+ * @param localConfigData node's local cache configurations
+ * (both from static config and stored with persistent caches).
+ *
+ */
+ public void filterDynamicCacheDescriptors(CacheJoinNodeDiscoveryData localConfigData) {
+ if (ctx.isDaemon())
+ return;
+
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = localConfigData.caches();
+
+ Iterator<Map.Entry<String, DynamicCacheDescriptor>> cachesIter = registeredCaches.entrySet().iterator();
+
+ while (cachesIter.hasNext()) {
+ Map.Entry<String, DynamicCacheDescriptor> e = cachesIter.next();
+
+ if (!caches.containsKey(e.getKey())) {
+ cachesIter.remove();
+
+ ctx.discovery().removeCacheFilter(e.getKey());
+ }
+ }
+
+ Iterator<Map.Entry<Integer, CacheGroupDescriptor>> grpsIter = registeredCacheGrps.entrySet().iterator();
+
+ while (grpsIter.hasNext()) {
+ Map.Entry<Integer, CacheGroupDescriptor> e = grpsIter.next();
+
+ boolean removeGrp = true;
+
+ for (DynamicCacheDescriptor cacheDescr : registeredCaches.values()) {
+ if (cacheDescr.groupId() == e.getKey()) {
+ removeGrp = false;
+
+ break;
+ }
+ }
+
+ if (removeGrp) {
+ grpsIter.remove();
+
+ ctx.discovery().removeCacheGroup(e.getValue());
+ }
+ }
+
+ locJoinCachesCtx = new LocalJoinCachesContext(
+ locJoinCachesCtx.caches(),
+ locJoinCachesCtx.initCaches(),
+ registeredCacheGrps,
+ registeredCaches);
+ }
+
+ /**
* @param joinDiscoData Information about configured caches and templates.
* @throws IgniteCheckedException If configuration validation failed.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 933a9e3..84dfe37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -284,6 +284,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Tmp storage for meta migration. */
private MetaStorage.TmpStorage tmpStorage;
+ /** Node's local cache configurations (both from static configuration and from persistent caches). */
+ private CacheJoinNodeDiscoveryData localConfigs;
+
/**
* @param ctx Kernal context.
*/
@@ -783,6 +786,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
startAllCachesOnClientStart()
);
+ localConfigs = discoData;
+
cachesInfo.onStart(discoData);
}
@@ -1982,6 +1987,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Caches to be started when this node starts.
*/
@Nullable public LocalJoinCachesContext localJoinCachesContext() {
+ if (ctx.discovery().localNode().order() == 1 && localConfigs != null)
+ cachesInfo.filterDynamicCacheDescriptors(localConfigs);
+
+ localConfigs = null;
+
return cachesInfo.localJoinCachesContext();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java
new file mode 100644
index 0000000..2279751
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDiscoveryDataHandlingInNewClusterTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ *
+ */
+@RunWith(JUnit4.class)
+public class IgniteDiscoveryDataHandlingInNewClusterTest extends GridCommonAbstractTest {
+ /** */
+ private static final String NODE_1_CONS_ID = "node01";
+
+ /** */
+ private static final String NODE_2_CONS_ID = "node02";
+
+ /** */
+ private static final String NODE_3_CONS_ID = "node03";
+
+ /** */
+ private static final String STATIC_CACHE_NAME = "staticCache";
+
+ /** */
+ private static final String DYNAMIC_CACHE_NAME_1 = "dynamicCache1";
+
+ /** */
+ private static final String DYNAMIC_CACHE_NAME_2 = "dynamicCache2";
+
+ /** Group where static and dynamic caches reside. */
+ private static final String GROUP_WITH_STATIC_CACHES = "group1";
+
+ /** Group where only dynamic caches reside. */
+ private static final String GROUP_WITH_DYNAMIC_CACHES = "group2";
+
+ /** Node filter to pin dynamic caches to a specific node. */
+ private static final IgnitePredicate<ClusterNode> nodeFilter = new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return node.consistentId().toString().contains(NODE_1_CONS_ID);
+ }
+ };
+
+ /** Discovery SPI aimed to fail node with it when another server node joins the topology. */
+ private TcpDiscoverySpi failingOnNodeJoinSpi = new TcpDiscoverySpi() {
+ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ super.startMessageProcess(msg);
+
+ throw new RuntimeException("Simulation of failure of node " + NODE_1_CONS_ID);
+ }
+
+ super.startMessageProcess(msg);
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (igniteInstanceName.contains(NODE_1_CONS_ID)) {
+ failingOnNodeJoinSpi.setIpFinder(sharedStaticIpFinder);
+ failingOnNodeJoinSpi.setJoinTimeout(60_000);
+
+ cfg.setDiscoverySpi(failingOnNodeJoinSpi);
+ }
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ if (igniteInstanceName.contains("client"))
+ cfg.setClientMode(true);
+ else {
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setInitialSize(10500000)
+ .setMaxSize(6659883008L)
+ .setPersistenceEnabled(false)
+ )
+ );
+ }
+
+ CacheConfiguration staticCacheCfg = new CacheConfiguration(STATIC_CACHE_NAME)
+ .setGroupName(GROUP_WITH_STATIC_CACHES)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ .setNodeFilter(nodeFilter);
+
+ cfg.setCacheConfiguration(staticCacheCfg);
+
+ return cfg;
+ }
+
+ /**
+ * Verifies that new node received discovery data from stopped grid filters it out
+ * from GridCacheProcessor and GridDiscoveryManager internal structures.
+ *
+ * All subsequent servers and clients join topology successfully.
+ *
+ * See related ticket <a href="https://issues.apache.org/jira/browse/IGNITE-10878">IGNITE-10878</a>.
+ */
+ @Test
+ public void testNewClusterFiltersDiscoveryDataReceivedFromStoppedCluster() throws Exception {
+ IgniteEx ig0 = startGrid(NODE_1_CONS_ID);
+
+ prepareDynamicCaches(ig0);
+
+ IgniteEx ig1 = startGrid(NODE_2_CONS_ID);
+
+ verifyCachesAndGroups(ig1);
+
+ IgniteEx ig2 = startGrid(NODE_3_CONS_ID);
+
+ verifyCachesAndGroups(ig2);
+
+ IgniteEx client = startGrid("client01");
+
+ verifyCachesAndGroups(client);
+ }
+
+ /** */
+ private void verifyCachesAndGroups(IgniteEx ignite) {
+ Map<String, DynamicCacheDescriptor> caches = ignite.context().cache().cacheDescriptors();
+
+ assertEquals(2, caches.size());
+ caches.keySet().contains(GridCacheUtils.UTILITY_CACHE_NAME);
+ caches.keySet().contains(STATIC_CACHE_NAME);
+
+ Map<Integer, CacheGroupDescriptor> groups = ignite.context().cache().cacheGroupDescriptors();
+
+ assertEquals(2, groups.size());
+
+ boolean defaultGroupFound = false;
+ boolean staticCachesGroupFound = false;
+
+ for (CacheGroupDescriptor grpDesc : groups.values()) {
+ if (grpDesc.cacheOrGroupName().equals(GridCacheUtils.UTILITY_CACHE_NAME))
+ defaultGroupFound = true;
+ else if (grpDesc.cacheOrGroupName().equals(GROUP_WITH_STATIC_CACHES))
+ staticCachesGroupFound = true;
+ }
+
+ assertTrue(String.format("Default group found: %b, static group found: %b",
+ defaultGroupFound,
+ staticCachesGroupFound),
+ defaultGroupFound && staticCachesGroupFound);
+ }
+
+ /** */
+ private void prepareDynamicCaches(IgniteEx ig) {
+ ig.getOrCreateCache(new CacheConfiguration<>(DYNAMIC_CACHE_NAME_1)
+ .setGroupName(GROUP_WITH_STATIC_CACHES)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ .setNodeFilter(nodeFilter)
+ );
+
+ ig.getOrCreateCache(new CacheConfiguration<>(DYNAMIC_CACHE_NAME_2)
+ .setGroupName(GROUP_WITH_DYNAMIC_CACHES)
+ .setAffinity(new RendezvousAffinityFunction(false, 16))
+ .setNodeFilter((IgnitePredicate<ClusterNode>)node -> node.consistentId().toString().contains(NODE_1_CONS_ID))
+ );
+ }
+
+ /**
+ * Turns off printing stack trace on detecting critical failure to speed up tests.
+ */
+ @BeforeClass
+ public static void setUpClass() {
+ System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "false");
+ }
+
+ /**
+ * Restoring default value for printing stack trace setting.
+ */
+ @AfterClass
+ public static void tearDownClass() {
+ System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "true");
+ }
+
+ /** {@inheritDoc} */
+ @After
+ @Override public void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index a1a2f42..6cfa534 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxPreloadNoWriteTe
import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxStoreValueTest;
import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest;
+import org.apache.ignite.internal.processors.cache.IgniteDiscoveryDataHandlingInNewClusterTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartCoordinatorFailoverTest;
@@ -258,6 +259,7 @@ public class IgniteCacheTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, IgniteCacheCreatePutTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheStartOnJoinTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheStartTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgniteDiscoveryDataHandlingInNewClusterTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheDiscoveryDataConcurrentJoinTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteClientCacheInitializationFailTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheFailedUpdateResponseTest.class, ignoredTests);