You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2022/01/21 10:53:05 UTC

[ignite-extensions] branch master updated: IGNITE-16331 Add topology validator implementation extension. (#86)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new aa2a168  IGNITE-16331 Add topology validator implementation extension. (#86)
aa2a168 is described below

commit aa2a16864f7fa873ac86c0579b78f1225f77cb2d
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Fri Jan 21 13:52:57 2022 +0300

    IGNITE-16331 Add topology validator implementation extension. (#86)
---
 modules/topology-validator-ext/README.md           |  68 +++
 .../modules/core/src/test/config/log4j-test.xml    |  40 ++
 .../modules/core/src/test/config/tests.properties  |   0
 modules/topology-validator-ext/pom.xml             |  67 +++
 .../CacheTopologyValidatorPluginProvider.java      | 372 +++++++++++++
 .../cache/IgniteCacheTopologyValidatorTest.java    | 616 +++++++++++++++++++++
 .../IgniteCacheTopologyValidatorTestSuite.java     |  27 +
 pom.xml                                            |   1 +
 8 files changed, 1191 insertions(+)

diff --git a/modules/topology-validator-ext/README.md b/modules/topology-validator-ext/README.md
new file mode 100644
index 0000000..3e61055
--- /dev/null
+++ b/modules/topology-validator-ext/README.md
@@ -0,0 +1,68 @@
+#What problem this module is intended to solve?
+
+Some network issues can cause the Ignite cluster to split into several isolated parts - segments. Nodes from different 
+segments cannot communicate with each other, while nodes from the same segment do not experience communication problems. 
+In this case, each segment marks the nodes with which the connection was lost as failed and considers itself as an 
+independent Ignite cluster. Let's call this scenario cluster segmentation.
+
+Cluster segmentation can lead to cache data inconsistency across different segments because each segment can continue 
+to handle cache update requests independently.
+
+Apache Ignite allows the user to provide custom validation logic for Ignite caches that will be applied to 
+each topology change, and if the validation fails, writes to the corresponding cache will be blocked. The mentioned 
+validation logic is passed to Ignite as an TopologyValidation interface implementation. It can be done through cache
+configuration or through Ignite plugin extensions mechanism (see CacheTopologyValidatorProvider interface). 
+
+This module represents an implementation of the Ignite plugin that provides the guarantee that 
+after cluster segmentation, no more than one segment can process write requests to all caches. This is achieved by
+providing implementation of the TopologyValidation interface as mentioned above. 
+
+The current implementation of TopologyValidation uses remaining Ignite baseline nodes in the topology to determine 
+segmentation.
+
+#In what cases cache writes will be blocked for the segment?
+
+The following rules are used to determine which segment can process cache write requests after segmentation and which 
+cannot:
+
+1. The segment is allowed to process cache writes requests after segmentation if and only if more than configured
+fraction of the baseline nodes remain in the segment, otherwise all writes to the cache will be blocked.
+2. If the cluster is split into two equal segments, writing to both of them will be blocked. 
+3. Since Ignite treats segmentation as sequential node failures, even a single node failure in an active cluster in
+which alive baseline nodes count is less or equals to segmentation threshold  is considered as segmentation and results
+in write block for all caches.
+
+#Configuration
+
+1. Configure CacheTopologyValidatorPluginProvider on each server node:
+
+   ```
+   new IgniteConfiguration() 
+       ... 
+       .setPluginProviders(new CacheTopologyValidatorPluginProvider());
+   ```
+
+2. Configure baseline nodes explicitly, or configure baseline nodes auto adjustment with a timeout that significantly 
+exceeds the node failure detection timeout. It can be done through Java Api or through control script. 
+See [1] and [2] for more info.
+
+Note that it is illegal to use baseline nodes auto adjustment with a zero timeout along with current 
+TopologyValidator implementation.
+
+3. Configure deactivation threshold.
+The deactivation threshold is a fraction of nodes that determines how many nodes must remain in the baseline topology in 
+order to this segment was considered valid and continued to accept write requests.
+This value must be in range from 0.5 (inclusively) to 1. Default value is 0.5. If the default value suits you, nothing
+to do is required.
+
+To set up custom deactivation threshold value set the `org.apache.ignite.topology.validator.deactivation.threshold` 
+distributed configuration property via control script (see https://ignite.apache.org/docs/latest/tools/control-script#working-with-cluster-properties) 
+
+#Manual segmentation resolving
+
+The state of each segment for which cache writes were blocked will be eventually switched to the READ-ONLY mode. 
+Manually switching the cluster state back to ACTIVE mode will restore cache write availability. It can be done through 
+Java Api or through control script. See [1] and [2] for more info. 
+
+[1] - https://ignite.apache.org/docs/latest/clustering/baseline-topology \
+[2] - https://ignite.apache.org/docs/latest/tools/control-script#activation-deactivation-and-topology-management
\ No newline at end of file
diff --git a/modules/topology-validator-ext/modules/core/src/test/config/log4j-test.xml b/modules/topology-validator-ext/modules/core/src/test/config/log4j-test.xml
new file mode 100644
index 0000000..323c348
--- /dev/null
+++ b/modules/topology-validator-ext/modules/core/src/test/config/log4j-test.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+    "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <param name="Threshold" value="DEBUG"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="levelMin" value="DEBUG"/>
+            <param name="levelMax" value="INFO"/>
+        </filter>
+    </appender>
+
+    <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.err"/>
+
+        <param name="Threshold" value="WARN"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+    </appender>
+
+    <category name="org">
+        <level value="INFO"/>
+    </category>
+
+    <root>
+        <level value="INFO"/>
+
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="CONSOLE_ERR"/>
+    </root>
+</log4j:configuration>
diff --git a/modules/topology-validator-ext/modules/core/src/test/config/tests.properties b/modules/topology-validator-ext/modules/core/src/test/config/tests.properties
new file mode 100644
index 0000000..e69de29
diff --git a/modules/topology-validator-ext/pom.xml b/modules/topology-validator-ext/pom.xml
new file mode 100644
index 0000000..05fa3b2
--- /dev/null
+++ b/modules/topology-validator-ext/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-extensions-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-topology-validator-ext</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java b/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
new file mode 100644
index 0000000..b3c8d48
--- /dev/null
+++ b/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator {
+    /**
+     * Decimal fraction of nodes that determines how many nodes must remain in the baseline topology in order to this
+     * segment was considered valid and continued to accept write requests after segmentation. This value must be in
+     * range from 0.5 (inclusively) to 1 or 0 if validation should be disabled.
+     *
+     * @see #DFLT_DEACTIVATION_THRESHOLD that is used as a default value.
+     */
+    public static final String TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME =
+        "org.apache.ignite.topology.validator.deactivation.threshold";
+
+    /** */
+    public static final float DFLT_DEACTIVATION_THRESHOLD = 0.5F;
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Float> deactivateThresholdProp = new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME,
+        str -> {
+            float res = Float.parseFloat(str);
+
+            if ((res < 0.5F || res >= 1F) && res != 0F) {
+                throw new IgniteException("Topology validator cluster deactivation threshold must be a decimal" +
+                    " fraction in the range from 0.5 (inclusively) to 1 or 0 if validation should be disabled.");
+            }
+
+            return res;
+        }
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /**
+     * {@code null} value means that segmentation happened, cache writes were blocked and cluster is in process of
+     * switching its state to READ-ONLY mode.
+     */
+    private volatile ClusterState state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        state = ctx.state().clusterState().state();
+
+        ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> state = msg.state()
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) ->
+            validateBaselineConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal));
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) ->
+            validateBaselineConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout())
+        );
+
+        ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(deactivateThresholdProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    float deactivationThreshold = U.isLocalNodeCoordinator(ctx.discovery())
+                        ? DFLT_DEACTIVATION_THRESHOLD
+                        : deactivateThresholdProp.getOrDefault(0F);
+
+                    if (deactivationThreshold == 0F) {
+                        U.warn(log, "Topology Validator will be disabled because it is not configured for the" +
+                            " cluster the current node joined. Make sure the Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(deactivateThresholdProp, deactivationThreshold, log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+        if (ctx.localNodeId().equals(nodeId))
+            state = (ClusterState)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not been handled by
+        // {@link TopologyChangedEventListener} yet, we cannot guarantee that the {@code state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == ACTIVE) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + ']';
+
+                        throw new PluginValidationException(msg, msg, node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        return isDisabled() || state != null;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return deactivateThresholdProp.getOrDefault(0F) == 0F;
+    }
+
+    /** */
+    private boolean isValidTopology(Collection<? extends BaselineNode> baselineNodes) {
+        int aliveBaselineNodes = F.size(baselineNodes, n -> !(n instanceof DetachedClusterNode));
+
+        if (aliveBaselineNodes == 0)
+            return true;
+
+        float threshold = deactivateThresholdProp.getOrDefault(DFLT_DEACTIVATION_THRESHOLD);
+
+        assert threshold >= 0.5F && threshold < 1;
+
+        // Actually Ignite considers segmentation as the sequential node failures. So we detect segmentation
+        // even if the single node fails and less than half of baseline nodes are alive.
+        return aliveBaselineNodes >= ((int)(baselineNodes.size() * threshold)) + 1;
+    }
+
+    /** */
+    private void validateBaselineConfiguration(Boolean enabled, Long autoAdjustmentTimeout) {
+        if (isDisabled() || enabled == null || autoAdjustmentTimeout == null)
+            return;
+
+        if (!isBaselineConfigurationCompatible(enabled, autoAdjustmentTimeout)) {
+            LT.warn(log, "Topology Validator is currently skipping validation of topology changes because" +
+                " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure Baseline Nodes" +
+                " explicitly or set Baseline Auto Adjustment Timeout to greater than zero.");
+        }
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean isBaselineConfigurationCompatible(boolean enabled, long autoAdjustmentTimeout) {
+        return !(enabled && autoAdjustmentTimeout == 0L);
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
+            ClusterState locStateCopy = state;
+
+            lastCheckedTopVer = evt.topologyVersion();
+
+            boolean isTopValidationApplicable =
+                !isDisabled() &&
+                state == ACTIVE &&
+                evt.type() == EVT_NODE_FAILED  &&
+                isBaselineConfigurationCompatible(
+                    ctx.state().isBaselineAutoAdjustEnabled(),
+                    ctx.state().baselineAutoAdjustTimeout()
+                );
+
+            if (isTopValidationApplicable && !isValidTopology(discoCache.baselineNodes())) {
+                locStateCopy = null;
+
+                try {
+                    ctx.closure().runLocal(new GridPlainRunnable() {
+                        @Override public void run() {
+                            try {
+                                ctx.cluster().get().state(ACTIVE_READ_ONLY);
+                            }
+                            catch (Throwable e) {
+                                U.error(log,
+                                "Failed to automatically switch state of the segmented cluster to the READ-ONLY" +
+                                    " mode. Cache writes were already restricted for all configured caches, but this" +
+                                    " step is still required in order to be able to unlock cache writes in the future." +
+                                    " Retry this operation manually, if possible [segmentedNodes=" +
+                                    F.viewReadOnly(discoCache.allNodes(), F.node2id()) + "]", e);
+                            }
+                        }
+                    }, PUBLIC_POOL);
+                }
+                catch (Throwable e) {
+                    U.error(log, "Failed to schedule cluster state change to the READ-ONLY mode.", e);
+                }
+
+                U.warn(log, "Cluster segmentation was detected. Write to all user caches were blocked" +
+                    " [segmentedNodes=" + F.viewReadOnly(discoCache.allNodes(), F.node2id()) + ']');
+            }
+
+            state = locStateCopy;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+    }
+}
diff --git a/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTest.java b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTest.java
new file mode 100644
index 0000000..d8a3f7b
--- /dev/null
+++ b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTest.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils.RunnableX;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
+import static org.apache.ignite.plugin.cache.CacheTopologyValidatorPluginProvider.TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public class IgniteCacheTopologyValidatorTest extends IgniteCacheTopologySplitAbstractTest {
+    /** */
+    private static final String LOCAL_HOST = "localhost";
+
+    /** */
+    private static final int CACHE_KEY_CNT = 1000;
+
+    /** */
+    public static final int CACHE_CNT = 2;
+
+    /** */
+    @Parameterized.Parameter
+    public boolean isPersistenceEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "isPersistenceEnabled={0}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[]{true}, new Object[]{false});
+    }
+
+    /** */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return getConfiguration(igniteInstanceName, true);
+    }
+
+    /** */
+    private IgniteConfiguration getConfiguration(
+        String igniteInstanceName,
+        boolean configureSegmentationResolverPlugin
+    ) throws Exception {
+        int idx = getTestIgniteInstanceIndex(igniteInstanceName);
+
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+            .setUserAttributes(singletonMap(IDX_ATTR, idx));
+
+        if (configureSegmentationResolverPlugin)
+            cfg.setPluginProviders(new CacheTopologyValidatorPluginProvider());
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi())
+            .setIpFinder(sharedStaticIpFinder)
+            .setLocalPortRange(1)
+            .setLocalPort(discoPort(idx))
+            .setConnectionRecoveryTimeout(0);
+
+        if (isPersistenceEnabled) {
+            cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setInitialSize(100L * 1024 * 1024)
+                    .setMaxSize(200L * 1024 * 1024)
+                    .setPersistenceEnabled(true)));
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isBlocked(int locPort, int rmtPort) {
+        return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort);
+    }
+
+    /**  */
+    private int segment(int discoPort) {
+        return (discoPort - DFLT_PORT) % 2 == 0 ? 0 : 1;
+    }
+
+    /** */
+    @Override public int segment(ClusterNode node) {
+        return node.<Integer>attribute(IDX_ATTR) % 2 == 0 ? 0 : 1;
+    }
+
+    /** */
+    @Test
+    public void testConnectionToIncompatibleCluster() throws Exception {
+        startGrid(getConfiguration(getTestIgniteInstanceName(0), false));
+
+        startGrid(1);
+
+        if (isPersistenceEnabled)
+            grid(0).cluster().state(ACTIVE);
+        else
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+        assertTrue(waitForCondition(
+            () -> 0F == (Float)grid(1).context()
+                .distributedConfiguration()
+                .property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+                .get(),
+            getTestTimeout()
+        ));
+
+        splitAndWait();
+
+        connectNodeToSegment(3, false, 1);
+
+        assertTrue(waitForCondition(
+            () -> 0F == (Float)grid(3).context()
+                .distributedConfiguration()
+                .property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+                .get(),
+            getTestTimeout()
+        ));
+    }
+
+    /** */
+    @Test
+    public void testIncompatibleNodeConnection() throws Exception {
+        prepareCluster(1);
+
+        assertThrowsAnyCause(
+            log,
+            () -> startGrid(getConfiguration(getTestIgniteInstanceName(1), false)),
+            IgniteSpiException.class,
+            "The Topology Validator plugin is not configured for the server node that is trying to join the cluster."
+        );
+
+        startClientGrid(getConfiguration(getTestIgniteInstanceName(2), false));
+
+        assertEquals(2, grid(0).cluster().nodes().size());
+
+        checkPutGet(G.allGrids(), true);
+    }
+
+    /** */
+    @Test
+    public void testConnectionToSegmentedCluster() throws Exception {
+        prepareCluster(6);
+
+        stopGrid(4);
+        stopGrid(5);
+
+        splitAndWait();
+
+        checkPutGet(G.allGrids(), false);
+
+        connectNodeToSegment(4, false, 0);
+        connectNodeToSegment(6, true, 0);
+
+        checkPutGet(0, false);
+
+        connectNodeToSegment(5, false, 1);
+        connectNodeToSegment(7, true, 1);
+
+        checkPutGet(1, false);
+
+        stopSegmentNodes(1);
+
+        unsplit();
+
+        startGrid(1);
+
+        checkPutGet(G.allGrids(), false);
+    }
+
+    /** */
+    @Test
+    public void testRegularNodeStartStop() throws Exception {
+        prepareCluster(1);
+
+        checkPutGetAfter(() -> startGrid(1));
+        checkPutGetAfter(() -> stopGrid(1));
+
+        checkPutGetAfter(() -> startClientGrid(2));
+        checkPutGetAfter(() -> stopGrid(2));
+
+        startGrid(1);
+
+        grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+        checkPutGetAfter(() -> startGrid(3));
+        checkPutGetAfter(() -> stopGrid(3));
+
+        checkPutGetAfter(() -> stopGrid(1));
+
+        checkPutGetAfter(() -> startClientGrid(2));
+        checkPutGetAfter(() -> stopGrid(2));
+    }
+
+    /** */
+    @Test
+    public void testClientNodeSegmentationIgnored() throws Exception {
+        prepareCluster(1);
+
+        startClientGrid(1);
+
+        failNode(1, Collections.singleton(grid(0)));
+
+        checkPutGet(Collections.singleton(grid(0)), true);
+    }
+
+    /** */
+    @Test
+    public void testSplitWithoutBaseline() throws Exception {
+        Assume.assumeFalse(isPersistenceEnabled);
+
+        startGridsMultiThreaded(4);
+
+        createCaches();
+
+        splitAndWait();
+
+        checkPutGet(G.allGrids(), true);
+    }
+
+    /** */
+    @Test
+    public void testSplitWithBaseline() throws Exception {
+        prepareCluster(3);
+
+        startGrid(3);
+
+        splitAndWait();
+
+        connectNodeToSegment(4, true, 0);
+        connectNodeToSegment(5, true, 1);
+
+        checkPutGet(0, true);
+        checkPutGet(1, false);
+
+        assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(1).cluster().state(), getTestTimeout()));
+
+        stopSegmentNodes(1);
+        stopGrid(4);
+
+        unsplit();
+
+        startGrid(1);
+        startGrid(3);
+
+        grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+        awaitPartitionMapExchange(true, true, null);
+
+        checkPutGet(G.allGrids(), true);
+
+        splitAndWait();
+
+        checkPutGet(G.allGrids(), false);
+
+        assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(1).cluster().state(), getTestTimeout()));
+        assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(0).cluster().state(), getTestTimeout()));
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkPutGet(0, true);
+        checkPutGet(1, false);
+    }
+
+    /** */
+    @Test
+    public void testConsequentSegmentationResolving() throws Exception {
+        prepareCluster(4);
+
+        splitAndWait();
+
+        checkPutGet(G.allGrids(), false);
+
+        grid(1).cluster().state(ACTIVE);
+
+        checkPutGet(0, false);
+        checkPutGet(1, true);
+
+        stopSegmentNodes(0);
+
+        unsplit();
+
+        failNode(1, Collections.singleton(grid(3)));
+
+        checkPutGet(Collections.singleton(grid(3)), false);
+
+        grid(3).cluster().state(ACTIVE);
+
+        checkPutGet(Collections.singleton(grid(3)), true);
+    }
+
+    /** */
+    @Test
+    public void testDeactivationThreshold() throws Exception {
+        prepareCluster(5);
+
+        grid(0).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+            .propagate(0.8F);
+
+        Collection<Ignite> segment = new ArrayList<>(G.allGrids());
+
+        segment.remove(grid(0));
+
+        failNode(0, segment);
+
+        checkPutGet(segment, false);
+
+        stopGrid(0);
+
+        grid(1).cluster().state(ACTIVE);
+
+        grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+            .propagate(0.5F);
+
+        segment.remove(grid(4));
+
+        failNode(4, segment);
+
+        checkPutGet(segment, true);
+    }
+
+    /** */
+    @Test
+    public void testValidationDisabled() throws Exception {
+        prepareCluster(4);
+
+        grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+            .propagate(0F);
+
+        splitAndWait();
+
+        connectNodeToSegment(4, true, 0);
+        connectNodeToSegment(5, true, 1);
+
+        checkPutGet(G.allGrids(), true);
+
+        stopSegmentNodes(0);
+
+        unsplit();
+
+        grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_DEACTIVATION_THRESHOLD_PROP_NAME)
+            .propagate(0.5F);
+
+        failNode(1, Collections.singleton(grid(3)));
+
+        checkPutGet(Collections.singleton(grid(3)), false);
+    }
+
+    /** */
+    @Test
+    public void testNodeJoinWithHalfBaselineNodesLeft() throws Exception {
+        prepareCluster(4);
+
+        stopGrid(0);
+        stopGrid(1);
+        stopGrid(2);
+
+        checkPutGet(G.allGrids(), true);
+
+        startGrid(0);
+
+        checkPutGet(G.allGrids(), true);
+    }
+
+    /** */
+    @Test
+    public void testNodeJoinConcurrentWithLeftRejected() throws Exception {
+        prepareCluster(2);
+
+        CountDownLatch discoveryWorkerBlockedLatch = new CountDownLatch(1);
+
+        try {
+            grid(0).events().localListen(evt -> {
+                try {
+                    discoveryWorkerBlockedLatch.await();
+                }
+                catch (InterruptedException e) {
+                    U.error(log, e);
+
+                    Thread.currentThread().interrupt();
+                }
+
+                return true;
+            }, EVT_NODE_JOINED);
+
+            startGrid(2);
+
+            stopGrid(1);
+
+            assertThrowsAnyCause(
+                log,
+                () -> startGrid(1),
+                IgniteSpiException.class,
+                "Node join request was rejected due to concurrent node left process handling"
+            );
+        }
+        finally {
+            discoveryWorkerBlockedLatch.countDown();
+        }
+    }
+
+    /** */
+    @Test
+    public void testPreconfiguredClusterState() throws Exception {
+        Assume.assumeFalse(isPersistenceEnabled);
+
+        startGrid(0);
+
+        startGrid(getConfiguration(getTestIgniteInstanceName(1)).setClusterStateOnStart(ACTIVE_READ_ONLY));
+
+        grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+        createCaches();
+
+        splitAndWait();
+
+        checkPutGet(G.allGrids(), false);
+    }
+
+    /** */
+    private IgniteEx connectNodeToSegment(int nodeIdx, boolean isClient, int segment) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(nodeIdx));
+
+        List<String> segmentDiscoPorts = segmentNodes(segment, false).stream()
+            .map(node -> LOCAL_HOST + ':' + discoPort(node.localNode().<Integer>attribute(IDX_ATTR)))
+            .collect(toList());
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(segmentDiscoPorts));
+
+        cfg.setClientMode(isClient);
+
+        return startGrid(cfg);
+    }
+
+    /**  */
+    private boolean isDiscoPort(int port) {
+        return port >= DFLT_PORT &&
+            port <= (DFLT_PORT + DFLT_PORT_RANGE);
+    }
+
+    /** */
+    public void createCaches() {
+        for (int cacheIdx = 0; cacheIdx < CACHE_CNT; cacheIdx++) {
+            grid(0).createCache(new CacheConfiguration<>()
+                .setName(cacheName(cacheIdx))
+                .setWriteSynchronizationMode(FULL_SYNC)
+                .setCacheMode(REPLICATED)
+                .setReadFromBackup(false));
+        }
+
+        checkPutGet(G.allGrids(), true);
+    }
+
+    /** */
+    private void checkPutGetAfter(RunnableX r) {
+        r.run();
+
+        checkPutGet(G.allGrids(), true);
+    }
+
+    /** */
+    private void checkPutGet(int segment, boolean successfulPutExpected) {
+        checkPutGet(segmentNodes(segment, true), successfulPutExpected);
+    }
+
+    /** */
+    private String cacheName(int cacheIdx) {
+        return DEFAULT_CACHE_NAME + '_' + cacheIdx;
+    }
+
+    /**  */
+    private int discoPort(int idx) {
+        return DFLT_PORT + idx;
+    }
+
+    /** */
+    private void checkPutGet(Collection<? extends Ignite> nodes, boolean successfulPutExpected) {
+        assertFalse(nodes.isEmpty());
+
+        for (Ignite node : nodes) {
+            for (int cacheIdx = 0; cacheIdx < CACHE_CNT; cacheIdx++) {
+                IgniteCache<Object, Object> cache = node.cache(cacheName(cacheIdx));
+
+                for (int i = 0; i < CACHE_KEY_CNT; i++) {
+                    int key = i;
+
+                    if (!successfulPutExpected) {
+                        assertThrowsAnyCause(
+                            null,
+                            () -> {
+                                cache.put(key, key);
+
+                                return null;
+                            },
+                            CacheInvalidStateException.class,
+                            "Failed to perform cache operation");
+                    }
+                    else
+                        cache.put(key, key);
+
+                    assertEquals(key, cache.get(key));
+                }
+            }
+        }
+    }
+
+    /** */
+    private void stopSegmentNodes(int segment) throws Exception {
+        for (IgniteEx node : segmentNodes(segment, true)) {
+            if (isPersistenceEnabled) {
+                String pdsFolder = node.context().pdsFolderResolver().resolveFolders().folderName();
+
+                stopGrid(node.name());
+
+                cleanPersistenceDir(pdsFolder);
+            }
+            else
+                stopGrid(node.name());
+        }
+    }
+
+    /** */
+    private void failNode(int idx, Collection<Ignite> awaitingNodes) {
+        assertFalse(awaitingNodes.isEmpty());
+
+        long topVer = awaitingNodes.iterator().next().cluster().topologyVersion();
+
+        TcpDiscoverySpi spi = (TcpDiscoverySpi)grid(idx).context().discovery().getInjectedDiscoverySpi();
+
+        spi.setClientReconnectDisabled(true);
+        spi.disconnect();
+
+        awaitExchangeVersionFinished(awaitingNodes, topVer + 1);
+    }
+
+    /** */
+    private Collection<IgniteEx> segmentNodes(int segment, boolean includeClients) {
+        return G.allGrids().stream()
+            .filter(ignite -> includeClients || !ignite.cluster().localNode().isClient())
+            .filter(ignite -> segment(ignite.cluster().localNode()) == segment)
+            .map(ignite -> (IgniteEx)ignite)
+            .collect(Collectors.toList());
+    }
+
+    /** */
+    private void prepareCluster(int nodes) throws Exception {
+        startGridsMultiThreaded(nodes);
+
+        if (isPersistenceEnabled)
+            grid(0).cluster().state(ACTIVE);
+        else
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+        createCaches();
+    }
+}
diff --git a/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTestSuite.java b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTestSuite.java
new file mode 100644
index 0000000..322da1f
--- /dev/null
+++ b/modules/topology-validator-ext/src/test/java/org/apache/ignite/plugin/cache/IgniteCacheTopologyValidatorTestSuite.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.cache;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/** Ignite topology validator test suite. */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({IgniteCacheTopologyValidatorTest.class})
+public class IgniteCacheTopologyValidatorTestSuite {
+}
diff --git a/pom.xml b/pom.xml
index 9aebb25..e99c1fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
         <module>modules/azure-ext</module>
         <module>modules/gce-ext</module>
         <module>modules/zookeeper-ip-finder-ext</module>
+        <module>modules/topology-validator-ext</module>
     </modules>
 
     <profiles>