You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2022/01/20 16:44:23 UTC

[ignite] branch master updated: IGNITE-16143 Adds an ability for plugins to provide topology validator as extension. (#9666)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f7869b  IGNITE-16143 Adds an ability for plugins to provide topology validator as extension. (#9666)
3f7869b is described below

commit 3f7869b9876d9987f53216850361a68482b699bf
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Thu Jan 20 19:43:46 2022 +0300

    IGNITE-16143 Adds an ability for plugins to provide topology validator as extension. (#9666)
---
 .../processors/cache/CacheGroupContext.java        |  45 ++++-
 .../dht/GridDhtTopologyFutureAdapter.java          |   9 +-
 .../plugin/CacheTopologyValidatorProvider.java     |  42 +++++
 .../cache/CacheTopologyValidatorProviderTest.java  | 205 +++++++++++++++++++++
 .../IgniteTopologyValidatorAbstractCacheTest.java  |  98 ++++++++--
 ...TopologyValidatorAbstractTxCacheGroupsTest.java |   2 +
 ...IgniteTopologyValidatorAbstractTxCacheTest.java |   2 +
 ...teTopologyValidatorCacheGroupsAbstractTest.java |  66 ++++---
 .../IgniteTopologyValidatorTestSuite.java          |   2 +
 9 files changed, 428 insertions(+), 43 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index a7b85e3..6cadb65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
 import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -186,6 +188,9 @@ public class CacheGroupContext {
     /** Cache group metrics. */
     private final CacheGroupMetricsImpl metrics;
 
+    /** Topology validators. */
+    private final Collection<TopologyValidator> topValidators;
+
     /**
      * @param ctx Context.
      * @param grpId Group ID.
@@ -261,6 +266,8 @@ public class CacheGroupContext {
         }
 
         hasAtomicCaches = ccfg.getAtomicityMode() == ATOMIC;
+
+        topValidators = Collections.unmodifiableCollection(topologyValidators(ccfg, ctx.kernalContext().plugins()));
     }
 
     /**
@@ -727,10 +734,10 @@ public class CacheGroupContext {
     }
 
     /**
-     * @return Configured topology validator.
+     * @return Configured topology validators.
      */
-    @Nullable public TopologyValidator topologyValidator() {
-        return ccfg.getTopologyValidator();
+    public Collection<TopologyValidator> topologyValidators() {
+        return topValidators;
     }
 
     /**
@@ -1297,4 +1304,36 @@ public class CacheGroupContext {
         if (statHolderIdx != IoStatisticsHolderNoOp.INSTANCE)
             ctx.kernalContext().metric().remove(statHolderIdx.metricRegistryName(), destroy);
     }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param plugins Ignite plugin processor.
+     * @return Comprehensive collection of topology validators for the cache based on its configuration
+     * and plugin extensions.
+     */
+    private Collection<TopologyValidator> topologyValidators(
+        CacheConfiguration<?, ?> ccfg,
+        IgnitePluginProcessor plugins
+    ) {
+        List<TopologyValidator> res = new ArrayList<>();
+
+        TopologyValidator ccfgTopValidator = ccfg.getTopologyValidator();
+
+        if (ccfgTopValidator != null)
+            res.add(ccfgTopValidator);
+
+        CacheTopologyValidatorProvider[] topValidatorProviders = plugins.extensions(CacheTopologyValidatorProvider.class);
+
+        if (F.isEmpty(topValidatorProviders))
+            return res;
+
+        for (CacheTopologyValidatorProvider topValidatorProvider : topValidatorProviders) {
+            TopologyValidator validator = topValidatorProvider.topologyValidator(cacheOrGroupName());
+
+            if (validator != null)
+                res.add(validator);
+        }
+
+        return res;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 3f637c52..1f248a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -62,10 +62,13 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
         boolean valid = true;
 
         if (!grp.systemCache()) {
-            TopologyValidator validator = grp.topologyValidator();
+            for (TopologyValidator validator : grp.topologyValidators()) {
+                if (!validator.validate(topNodes)) {
+                    valid = false;
 
-            if (validator != null)
-                valid = validator.validate(topNodes);
+                    break;
+                }
+            }
         }
 
         return new CacheGroupValidation(valid, lostParts);
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CacheTopologyValidatorProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/CacheTopologyValidatorProvider.java
new file mode 100644
index 0000000..c9361b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/CacheTopologyValidatorProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin;
+
+import org.apache.ignite.configuration.TopologyValidator;
+
+/**
+ * The {@link CacheTopologyValidatorProvider} is used to pass an implementation of {@link TopologyValidator}s for a specific
+ * cache through Ignite plugin extensions mechanism. Each cache, on startup, iterates over all registered
+ * {@link CacheTopologyValidatorProvider}s and tries to obtain the instance of {@link TopologyValidator} by
+ * cache name. All obtained {@link TopologyValidator}s are saved in the cache context and successively called during
+ * topology validation process. The topology validation is passed if all {@link TopologyValidator}s consider that
+ * the current topology is valid.
+ *
+ * @see TopologyValidator
+ * @see Extension
+ * @see PluginProvider#initExtensions(PluginContext, ExtensionRegistry)
+ */
+public interface CacheTopologyValidatorProvider extends Extension {
+    /**
+     * Provides instance of {@link TopologyValidator} for the cache with specified name.
+     *
+     * @param cacheName Name of the cache or cache group.
+     * @return Instance of topology validator for the cache with specified name.
+     */
+    public TopologyValidator topologyValidator(String cacheName);
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTopologyValidatorProviderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTopologyValidatorProviderTest.java
new file mode 100644
index 0000000..1caad69
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTopologyValidatorProviderTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.internal.processors.cache.IgniteTopologyValidatorAbstractCacheTest.TestCacheTopologyValidatorPluginProvider;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/** */
+public class CacheTopologyValidatorProviderTest extends GridCommonAbstractTest {
+    /** */
+    private IgniteConfiguration getConfiguration(
+        int idx,
+        boolean isPersistenceEnabled,
+        PluginProvider<?>... providers
+    ) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(idx));
+
+        cfg.setPluginProviders(providers);
+
+        if (isPersistenceEnabled) {
+            cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(100 * (1 << 20))
+                    .setPersistenceEnabled(true)));
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testTopologyValidatorProviderWithPersistence() throws Exception {
+        startGrid(getConfiguration(0, true, new TestPluginProvider("top-validator", 0)));
+
+        grid(0).cluster().state(ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        grid(0).createCache(DEFAULT_CACHE_NAME);
+
+        checkCachePut(DEFAULT_CACHE_NAME, true);
+
+        stopAllGrids();
+
+        PluginProvider<?> pluginProvider = new TestPluginProvider("top-validator", 1);
+
+        startGrid(getConfiguration(0, true, pluginProvider));
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkCachePut(DEFAULT_CACHE_NAME, false);
+
+        assertEquals(0, grid(0).cache(DEFAULT_CACHE_NAME).get(0));
+
+        startGrid(getConfiguration(1, true, pluginProvider));
+
+        checkCachePut(DEFAULT_CACHE_NAME, true);
+
+        stopAllGrids();
+
+        startGrid(getConfiguration(0, true));
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkCachePut(DEFAULT_CACHE_NAME, true);
+    }
+
+    /** */
+    @Test
+    public void testCacheConfigurationValidatorAlongsidePluginValidators() throws Exception {
+        PluginProvider<?> firstPluginProvider = new TestPluginProvider("first-top-validator", 2);
+        PluginProvider<?> secondPluginProvider = new TestPluginProvider("second-top-validator", 3);
+
+        startGrid(getConfiguration(0, false, firstPluginProvider, secondPluginProvider));
+
+        grid(0).createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setTopologyValidator(new TestTopologyValidator()));
+
+        checkCachePut(DEFAULT_CACHE_NAME, false);
+
+        startGrid(getConfiguration(1, false, firstPluginProvider, secondPluginProvider));
+
+        checkCachePut(DEFAULT_CACHE_NAME, false);
+
+        startGrid(getConfiguration(2, false, firstPluginProvider, secondPluginProvider));
+
+        checkCachePut(DEFAULT_CACHE_NAME, false);
+
+        startGrid(getConfiguration(3, false, firstPluginProvider, secondPluginProvider));
+
+        checkCachePut(DEFAULT_CACHE_NAME, true);
+    }
+
+    /** */
+    private void checkCachePut(String cacheName, boolean isSuccessExpected) {
+        for (Ignite ignite : G.allGrids()) {
+            if (isSuccessExpected) {
+                ignite.cache(cacheName).put(0, 0);
+
+                assertEquals(0, grid(0).cache(cacheName).get(0));
+            }
+            else {
+                GridTestUtils.assertThrows(
+                    log,
+                    () -> {
+                        ignite.cache(cacheName).put(0, 0);
+
+                        return null;
+                    },
+                    CacheInvalidStateException.class,
+                    "Failed to perform cache operation"
+                );
+            }
+        }
+    }
+
+    /** */
+    private static class TestTopologyValidator implements TopologyValidator {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean validate(Collection<ClusterNode> nodes) {
+            return nodes.size() > 1;
+        }
+    }
+
+    /** */
+    private static class TestPluginProvider extends TestCacheTopologyValidatorPluginProvider {
+        /** */
+        private final String name;
+
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return name;
+        }
+
+        /** */
+        private TestPluginProvider(String name, int validationThreshold) {
+            super(new TestCacheTopologyValidatorProvider(validationThreshold));
+
+            this.name = name;
+        }
+
+        /** */
+        private static class TestCacheTopologyValidatorProvider implements CacheTopologyValidatorProvider {
+            /** */
+            private final int validationThreshold;
+
+            /** */
+            public TestCacheTopologyValidatorProvider(int validationThreshold) {
+                this.validationThreshold = validationThreshold;
+            }
+
+            /** {@inheritDoc} */
+            @Override public TopologyValidator topologyValidator(String cacheName) {
+                return nodes -> nodes.size() > validationThreshold;
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java
index ee59493..d5ac52b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -30,13 +31,30 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TopologyValidator;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
 import org.apache.ignite.transactions.Transaction;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Topology validator test.
  */
+@RunWith(Parameterized.class)
 public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCacheAbstractTest implements Serializable {
+    /** */
+    @Parameterized.Parameter()
+    public Boolean isPluginTopValidatorProvider;
+
+    /** */
+    @Parameterized.Parameters(name = "isPluginTopValidatorProvider={0}")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new Object[] {true}, new Object[] {false});
+    }
+
     /** key-value used at test. */
     private static String KEY_VAL = "1";
 
@@ -48,7 +66,19 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac
 
     /** {@inheritDoc} */
     @Override protected final int gridCount() {
-        return 1;
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
     }
 
     /** {@inheritDoc} */
@@ -66,20 +96,14 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac
 
             cfg.setCacheConfiguration(cCfg0, cCfg1, cCfg2);
 
-            for (CacheConfiguration cCfg : cfg.getCacheConfiguration()) {
-                if (cCfg.getName().equals(CACHE_NAME_1))
-                    cCfg.setTopologyValidator(new TopologyValidator() {
-                        @Override public boolean validate(Collection<ClusterNode> nodes) {
-                            return servers(nodes) == 2;
-                        }
-                    });
-                else if (cCfg.getName().equals(CACHE_NAME_2))
-                    cCfg.setTopologyValidator(new TopologyValidator() {
-                        @Override public boolean validate(Collection<ClusterNode> nodes) {
-                            return servers(nodes) >= 2;
-                        }
-                    });
+            TestCacheTopologyValidatorProvider topValidatorProvider = new TestCacheTopologyValidatorProvider();
+
+            if (!isPluginTopValidatorProvider) {
+                for (CacheConfiguration cCfg : cfg.getCacheConfiguration())
+                    cCfg.setTopologyValidator(topValidatorProvider.topologyValidator(cCfg.getName()));
             }
+            else
+                cfg.setPluginProviders(new TestCacheTopologyValidatorPluginProvider(topValidatorProvider));
         }
 
         return cfg;
@@ -238,6 +262,8 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac
      */
     @Test
     public void testTopologyValidator() throws Exception {
+        startGrid(0);
+
         putValid(DEFAULT_CACHE_NAME);
         remove(DEFAULT_CACHE_NAME);
 
@@ -281,4 +307,48 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac
         putValid(CACHE_NAME_2);
         remove(CACHE_NAME_2);
     }
+
+    /** */
+    public static class TestCacheTopologyValidatorPluginProvider extends AbstractTestPluginProvider {
+        /** */
+        private final CacheTopologyValidatorProvider provider;
+
+        /** */
+        public TestCacheTopologyValidatorPluginProvider(CacheTopologyValidatorProvider provider) {
+            this.provider = provider;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return "CacheTopologyValidatorProviderPlugin";
+        }
+
+        /** {@inheritDoc} */
+        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, provider);
+        }
+    }
+
+    /** */
+    private static class TestCacheTopologyValidatorProvider implements CacheTopologyValidatorProvider, Serializable {
+        /** {@inheritDoc} */
+        @Override public TopologyValidator topologyValidator(String cacheName) {
+            if (CACHE_NAME_1.equals(cacheName)) {
+                return new TopologyValidator() {
+                    @Override public boolean validate(Collection<ClusterNode> nodes) {
+                        return servers(nodes) == 2;
+                    }
+                };
+            }
+            else if (CACHE_NAME_2.equals(cacheName)) {
+                return new TopologyValidator() {
+                    @Override public boolean validate(Collection<ClusterNode> nodes) {
+                        return servers(nodes) >= 2;
+                    }
+                };
+            }
+            else
+                return null;
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheGroupsTest.java
index 8bd5091..e270174 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheGroupsTest.java
@@ -46,6 +46,8 @@ public abstract class IgniteTopologyValidatorAbstractTxCacheGroupsTest
     /** {@inheritDoc} */
     @Test
     @Override public void testTopologyValidator() throws Exception {
+        startGrid(0);
+
         try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
             putInvalid(CACHE_NAME_1);
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheTest.java
index e53b5a2..0fa3aa4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractTxCacheTest.java
@@ -45,6 +45,8 @@ public abstract class IgniteTopologyValidatorAbstractTxCacheTest extends IgniteT
     /** {@inheritDoc} */
     @Test
     @Override public void testTopologyValidator() throws Exception {
+        startGrid(0);
+
         try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
             putInvalid(CACHE_NAME_1);
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheGroupsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheGroupsAbstractTest.java
index a152e66..7392e93 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheGroupsAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheGroupsAbstractTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.Serializable;
 import java.util.Collection;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TopologyValidator;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
 import org.junit.Test;
 
 /**
@@ -45,36 +47,29 @@ public abstract class IgniteTopologyValidatorCacheGroupsAbstractTest extends Ign
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration icfg = super.getConfiguration(igniteInstanceName);
 
-        CacheConfiguration[] ccfgs = icfg.getCacheConfiguration();
-
-        TopologyValidator val1 = new TopologyValidator() {
-            @Override public boolean validate(Collection<ClusterNode> nodes) {
-                return nodes.size() == 2;
-            }
-        };
-
-        TopologyValidator val2 = new TopologyValidator() {
-            @Override public boolean validate(Collection<ClusterNode> nodes) {
-                return nodes.size() >= 2;
-            }
-        };
+        CacheConfiguration[] ccfgs = F.concat(
+            icfg.getCacheConfiguration(),
+            cacheConfiguration(igniteInstanceName).setName(CACHE_NAME_3),
+            cacheConfiguration(igniteInstanceName).setName(CACHE_NAME_4)
+        );
 
         for (CacheConfiguration ccfg : ccfgs) {
             if (CACHE_NAME_1.equals(ccfg.getName()) || CACHE_NAME_2.equals(ccfg.getName()))
-                ccfg.setGroupName(GROUP_1).setTopologyValidator(val1);
+                ccfg.setGroupName(GROUP_1);
+            else if (CACHE_NAME_3.equals(ccfg.getName()) || CACHE_NAME_4.equals(ccfg.getName()))
+                ccfg.setGroupName(GROUP_2);
         }
 
-        CacheConfiguration ccfg3 = cacheConfiguration(igniteInstanceName)
-            .setName(CACHE_NAME_3)
-            .setGroupName(GROUP_2)
-            .setTopologyValidator(val2);
+        TestCacheGroupTopologyValidatorProvider topValidatorProvider = new TestCacheGroupTopologyValidatorProvider();
 
-        CacheConfiguration ccfg4 = cacheConfiguration(igniteInstanceName)
-            .setName(CACHE_NAME_4)
-            .setGroupName(GROUP_2)
-            .setTopologyValidator(val2);
+        if (isPluginTopValidatorProvider)
+            icfg.setPluginProviders(new TestCacheTopologyValidatorPluginProvider(topValidatorProvider));
+        else {
+            for (CacheConfiguration ccfg : ccfgs)
+                ccfg.setTopologyValidator(topValidatorProvider.topologyValidator(ccfg.getGroupName()));
+        }
 
-        return icfg.setCacheConfiguration(F.concat(ccfgs, ccfg3, ccfg4));
+        return icfg.setCacheConfiguration(ccfgs);
     }
 
     /**
@@ -82,6 +77,8 @@ public abstract class IgniteTopologyValidatorCacheGroupsAbstractTest extends Ign
      */
     @Test
     @Override public void testTopologyValidator() throws Exception {
+        startGrid(0);
+
         putValid(DEFAULT_CACHE_NAME);
         remove(DEFAULT_CACHE_NAME);
 
@@ -130,4 +127,27 @@ public abstract class IgniteTopologyValidatorCacheGroupsAbstractTest extends Ign
         putValid(CACHE_NAME_4);
         remove(CACHE_NAME_4);
     }
+
+    /** */
+    private static class TestCacheGroupTopologyValidatorProvider implements CacheTopologyValidatorProvider, Serializable {
+        /** {@inheritDoc} */
+        @Override public TopologyValidator topologyValidator(String grpName) {
+            if (GROUP_1.equals(grpName)) {
+                return new TopologyValidator() {
+                    @Override public boolean validate(Collection<ClusterNode> nodes) {
+                        return nodes.size() == 2;
+                    }
+                };
+            }
+            else if (GROUP_2.equals(grpName)) {
+                return new TopologyValidator() {
+                    @Override public boolean validate(Collection<ClusterNode> nodes) {
+                        return nodes.size() >= 2;
+                    }
+                };
+            }
+            else
+                return null;
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuite.java
index 4fddbe0..ba3e556 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.internal.processors.cache.CacheTopologyValidatorProviderTest;
 import org.apache.ignite.internal.processors.cache.IgniteTopologyValidatorGridSplitCacheTest;
 import org.apache.ignite.internal.processors.cache.IgniteTopologyValidatorNearPartitionedAtomicCacheGroupsTest;
 import org.apache.ignite.internal.processors.cache.IgniteTopologyValidatorNearPartitionedAtomicCacheTest;
@@ -61,6 +62,7 @@ public class IgniteTopologyValidatorTestSuite {
         GridTestUtils.addTestIfNeeded(suite, IgniteTopologyValidatorReplicatedTxCacheGroupsTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteTopologyValidatorGridSplitCacheTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheTopologyValidatorProviderTest.class, ignoredTests);
 
         return suite;
     }